• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.9
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import contextlib
1✔
3
import copy
1✔
4
import datetime
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
from collections import defaultdict
1✔
9
from inspect import signature
1✔
10
from io import BytesIO
1✔
11
from operator import itemgetter
1✔
12
from threading import RLock
1✔
13
from typing import IO
1✔
14
from urllib import parse as urlparse
1✔
15
from zoneinfo import ZoneInfo
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
19
from localstack.aws.api.s3 import (
1✔
20
    MFA,
21
    AbortMultipartUploadOutput,
22
    AccelerateConfiguration,
23
    AccessControlPolicy,
24
    AccessDenied,
25
    AccountId,
26
    AnalyticsConfiguration,
27
    AnalyticsId,
28
    AuthorizationHeaderMalformed,
29
    BadDigest,
30
    Body,
31
    Bucket,
32
    BucketAlreadyExists,
33
    BucketAlreadyOwnedByYou,
34
    BucketCannedACL,
35
    BucketLifecycleConfiguration,
36
    BucketLocationConstraint,
37
    BucketLoggingStatus,
38
    BucketName,
39
    BucketNotEmpty,
40
    BucketRegion,
41
    BucketVersioningStatus,
42
    BypassGovernanceRetention,
43
    ChecksumAlgorithm,
44
    ChecksumCRC32,
45
    ChecksumCRC32C,
46
    ChecksumCRC64NVME,
47
    ChecksumSHA1,
48
    ChecksumSHA256,
49
    ChecksumType,
50
    CommonPrefix,
51
    CompletedMultipartUpload,
52
    CompleteMultipartUploadOutput,
53
    ConditionalRequestConflict,
54
    ConfirmRemoveSelfBucketAccess,
55
    ContentMD5,
56
    CopyObjectOutput,
57
    CopyObjectRequest,
58
    CopyObjectResult,
59
    CopyPartResult,
60
    CORSConfiguration,
61
    CreateBucketOutput,
62
    CreateBucketRequest,
63
    CreateMultipartUploadOutput,
64
    CreateMultipartUploadRequest,
65
    CrossLocationLoggingProhibitted,
66
    Delete,
67
    DeletedObject,
68
    DeleteMarkerEntry,
69
    DeleteObjectOutput,
70
    DeleteObjectsOutput,
71
    DeleteObjectTaggingOutput,
72
    Delimiter,
73
    EncodingType,
74
    Error,
75
    Expiration,
76
    FetchOwner,
77
    GetBucketAccelerateConfigurationOutput,
78
    GetBucketAclOutput,
79
    GetBucketAnalyticsConfigurationOutput,
80
    GetBucketCorsOutput,
81
    GetBucketEncryptionOutput,
82
    GetBucketIntelligentTieringConfigurationOutput,
83
    GetBucketInventoryConfigurationOutput,
84
    GetBucketLifecycleConfigurationOutput,
85
    GetBucketLocationOutput,
86
    GetBucketLoggingOutput,
87
    GetBucketMetricsConfigurationOutput,
88
    GetBucketOwnershipControlsOutput,
89
    GetBucketPolicyOutput,
90
    GetBucketPolicyStatusOutput,
91
    GetBucketReplicationOutput,
92
    GetBucketRequestPaymentOutput,
93
    GetBucketTaggingOutput,
94
    GetBucketVersioningOutput,
95
    GetBucketWebsiteOutput,
96
    GetObjectAclOutput,
97
    GetObjectAttributesOutput,
98
    GetObjectAttributesParts,
99
    GetObjectAttributesRequest,
100
    GetObjectLegalHoldOutput,
101
    GetObjectLockConfigurationOutput,
102
    GetObjectOutput,
103
    GetObjectRequest,
104
    GetObjectRetentionOutput,
105
    GetObjectTaggingOutput,
106
    GetObjectTorrentOutput,
107
    GetPublicAccessBlockOutput,
108
    HeadBucketOutput,
109
    HeadObjectOutput,
110
    HeadObjectRequest,
111
    IfMatch,
112
    IfMatchInitiatedTime,
113
    IfMatchLastModifiedTime,
114
    IfMatchSize,
115
    IfNoneMatch,
116
    IntelligentTieringConfiguration,
117
    IntelligentTieringId,
118
    InvalidArgument,
119
    InvalidBucketName,
120
    InvalidDigest,
121
    InvalidObjectState,
122
    InvalidPartNumber,
123
    InvalidPartOrder,
124
    InvalidStorageClass,
125
    InvalidTargetBucketForLogging,
126
    InventoryConfiguration,
127
    InventoryId,
128
    KeyMarker,
129
    LifecycleRules,
130
    ListBucketAnalyticsConfigurationsOutput,
131
    ListBucketIntelligentTieringConfigurationsOutput,
132
    ListBucketInventoryConfigurationsOutput,
133
    ListBucketMetricsConfigurationsOutput,
134
    ListBucketsOutput,
135
    ListMultipartUploadsOutput,
136
    ListObjectsOutput,
137
    ListObjectsV2Output,
138
    ListObjectVersionsOutput,
139
    ListPartsOutput,
140
    Marker,
141
    MaxBuckets,
142
    MaxKeys,
143
    MaxParts,
144
    MaxUploads,
145
    MethodNotAllowed,
146
    MetricsConfiguration,
147
    MetricsId,
148
    MissingSecurityHeader,
149
    MpuObjectSize,
150
    MultipartUpload,
151
    MultipartUploadId,
152
    NoSuchBucket,
153
    NoSuchBucketPolicy,
154
    NoSuchCORSConfiguration,
155
    NoSuchKey,
156
    NoSuchLifecycleConfiguration,
157
    NoSuchPublicAccessBlockConfiguration,
158
    NoSuchTagSet,
159
    NoSuchUpload,
160
    NoSuchWebsiteConfiguration,
161
    NotificationConfiguration,
162
    Object,
163
    ObjectIdentifier,
164
    ObjectKey,
165
    ObjectLockConfiguration,
166
    ObjectLockConfigurationNotFoundError,
167
    ObjectLockEnabled,
168
    ObjectLockLegalHold,
169
    ObjectLockMode,
170
    ObjectLockRetention,
171
    ObjectLockToken,
172
    ObjectOwnership,
173
    ObjectPart,
174
    ObjectVersion,
175
    ObjectVersionId,
176
    ObjectVersionStorageClass,
177
    OptionalObjectAttributesList,
178
    Owner,
179
    OwnershipControls,
180
    OwnershipControlsNotFoundError,
181
    Part,
182
    PartNumber,
183
    PartNumberMarker,
184
    Policy,
185
    PostResponse,
186
    PreconditionFailed,
187
    Prefix,
188
    PublicAccessBlockConfiguration,
189
    PutBucketAclRequest,
190
    PutBucketLifecycleConfigurationOutput,
191
    PutObjectAclOutput,
192
    PutObjectAclRequest,
193
    PutObjectLegalHoldOutput,
194
    PutObjectLockConfigurationOutput,
195
    PutObjectOutput,
196
    PutObjectRequest,
197
    PutObjectRetentionOutput,
198
    PutObjectTaggingOutput,
199
    ReplicationConfiguration,
200
    ReplicationConfigurationNotFoundError,
201
    RequestPayer,
202
    RequestPaymentConfiguration,
203
    RestoreObjectOutput,
204
    RestoreRequest,
205
    S3Api,
206
    ServerSideEncryption,
207
    ServerSideEncryptionConfiguration,
208
    SkipValidation,
209
    SSECustomerAlgorithm,
210
    SSECustomerKey,
211
    SSECustomerKeyMD5,
212
    StartAfter,
213
    StorageClass,
214
    Tagging,
215
    TagSet,
216
    Token,
217
    TransitionDefaultMinimumObjectSize,
218
    UploadIdMarker,
219
    UploadPartCopyOutput,
220
    UploadPartCopyRequest,
221
    UploadPartOutput,
222
    UploadPartRequest,
223
    VersionIdMarker,
224
    VersioningConfiguration,
225
    WebsiteConfiguration,
226
)
227
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
228
from localstack.aws.handlers import (
1✔
229
    modify_service_response,
230
    preprocess_request,
231
    serve_custom_service_request_handlers,
232
)
233
from localstack.constants import AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1
1✔
234
from localstack.services.edge import ROUTER
1✔
235
from localstack.services.plugins import ServiceLifecycleHook
1✔
236
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
237
from localstack.services.s3.constants import (
1✔
238
    ALLOWED_HEADER_OVERRIDES,
239
    ARCHIVES_STORAGE_CLASSES,
240
    CHECKSUM_ALGORITHMS,
241
    DEFAULT_BUCKET_ENCRYPTION,
242
    S3_HOST_ID,
243
)
244
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
245
from localstack.services.s3.exceptions import (
1✔
246
    InvalidBucketOwnerAWSAccountID,
247
    InvalidBucketState,
248
    InvalidRequest,
249
    MalformedPolicy,
250
    MalformedXML,
251
    NoSuchConfiguration,
252
    NoSuchObjectLockConfiguration,
253
    TooManyConfigurations,
254
    UnexpectedContent,
255
)
256
from localstack.services.s3.models import (
1✔
257
    BucketCorsIndex,
258
    EncryptionParameters,
259
    ObjectLockParameters,
260
    S3Bucket,
261
    S3DeleteMarker,
262
    S3Multipart,
263
    S3Object,
264
    S3Part,
265
    S3Store,
266
    VersionedKeyStore,
267
    s3_stores,
268
)
269
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
270
from localstack.services.s3.presigned_url import validate_post_policy
1✔
271
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
272
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
273
from localstack.services.s3.utils import (
1✔
274
    ObjectRange,
275
    add_expiration_days_to_datetime,
276
    base_64_content_md5_to_etag,
277
    create_redirect_for_post_request,
278
    create_s3_kms_managed_key_for_region,
279
    decode_user_metadata,
280
    encode_user_metadata,
281
    etag_to_base_64_content_md5,
282
    extract_bucket_key_version_id_from_copy_source,
283
    generate_safe_version_id,
284
    get_bucket_location_xml,
285
    get_canned_acl,
286
    get_class_attrs_from_spec_class,
287
    get_failed_precondition_copy_source,
288
    get_failed_upload_part_copy_source_preconditions,
289
    get_full_default_bucket_location,
290
    get_kms_key_arn,
291
    get_lifecycle_rule_from_object,
292
    get_owner_for_account_id,
293
    get_permission_from_header,
294
    get_retention_from_now,
295
    get_s3_checksum_algorithm_from_request,
296
    get_s3_checksum_algorithm_from_trailing_headers,
297
    get_system_metadata_from_request,
298
    get_unique_key_id,
299
    get_url_encoded_object_location,
300
    header_name_from_capitalized_param,
301
    is_bucket_name_valid,
302
    is_version_older_than_other,
303
    parse_copy_source_range_header,
304
    parse_post_object_tagging_xml,
305
    parse_range_header,
306
    parse_tagging_header,
307
    s3_response_handler,
308
    serialize_expiration_header,
309
    str_to_rfc_1123_datetime,
310
    validate_dict_fields,
311
    validate_failed_precondition,
312
    validate_kms_key_id,
313
    validate_location_constraint,
314
    validate_tag_set,
315
)
316
from localstack.services.s3.validation import (
1✔
317
    parse_grants_in_headers,
318
    validate_acl_acp,
319
    validate_bucket_analytics_configuration,
320
    validate_bucket_intelligent_tiering_configuration,
321
    validate_canned_acl,
322
    validate_checksum_value,
323
    validate_cors_configuration,
324
    validate_inventory_configuration,
325
    validate_lifecycle_configuration,
326
    validate_object_key,
327
    validate_sse_c,
328
    validate_website_configuration,
329
)
330
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
331
from localstack.state import AssetDirectory, StateVisitor
1✔
332
from localstack.utils.aws.arns import s3_bucket_name
1✔
333
from localstack.utils.aws.aws_stack import get_valid_regions_for_service
1✔
334
from localstack.utils.collections import select_from_typed_dict
1✔
335
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
336

337
LOG = logging.getLogger(__name__)
1✔
338

339
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
340
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
341
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
342
OBJECT_LOCK_MODES = get_class_attrs_from_spec_class(ObjectLockMode)
1✔
343

344
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
345

346

347
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
348
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
349
        super().__init__()
1✔
350
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
351
        self._notification_dispatcher = NotificationDispatcher()
1✔
352
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
353
        # TODO: add lock for keys for PutObject, only way to support precondition writes for versioned buckets
354
        self._preconditions_locks = defaultdict(lambda: defaultdict(RLock))
1✔
355

356
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
357
        # in case the rules have changed
358
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
359

360
    def on_after_init(self):
1✔
361
        preprocess_request.append(self._cors_handler)
1✔
362
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
363
        modify_service_response.append(self.service, s3_response_handler)
1✔
364
        register_website_hosting_routes(router=ROUTER)
1✔
365

366
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
367
        visitor.visit(s3_stores)
×
UNCOV
368
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
369

370
    def on_before_state_save(self):
1✔
UNCOV
371
        self._storage_backend.flush()
×
372

373
    def on_after_state_reset(self):
1✔
UNCOV
374
        self._cors_handler.invalidate_cache()
×
375

376
    def on_after_state_load(self):
1✔
UNCOV
377
        self._cors_handler.invalidate_cache()
×
378

379
    def on_before_stop(self):
1✔
380
        self._notification_dispatcher.shutdown()
1✔
381
        self._storage_backend.close()
1✔
382

383
    def _notify(
1✔
384
        self,
385
        context: RequestContext,
386
        s3_bucket: S3Bucket,
387
        s3_object: S3Object | S3DeleteMarker = None,
388
        s3_notif_ctx: S3EventNotificationContext = None,
389
    ):
390
        """
391
        :param context: the RequestContext, to retrieve more information about the incoming notification
392
        :param s3_bucket: the S3Bucket object
393
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
394
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
395
        :return:
396
        """
397
        if s3_bucket.notification_configuration:
1✔
398
            if not s3_notif_ctx:
1✔
399
                s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
400
                    context,
401
                    s3_bucket=s3_bucket,
402
                    s3_object=s3_object,
403
                )
404

405
            self._notification_dispatcher.send_notifications(
1✔
406
                s3_notif_ctx, s3_bucket.notification_configuration
407
            )
408

409
    def _verify_notification_configuration(
1✔
410
        self,
411
        notification_configuration: NotificationConfiguration,
412
        skip_destination_validation: SkipValidation,
413
        context: RequestContext,
414
        bucket_name: str,
415
    ):
416
        self._notification_dispatcher.verify_configuration(
1✔
417
            notification_configuration, skip_destination_validation, context, bucket_name
418
        )
419

420
    def _get_expiration_header(
1✔
421
        self,
422
        lifecycle_rules: LifecycleRules,
423
        bucket: BucketName,
424
        s3_object: S3Object,
425
        object_tags: dict[str, str],
426
    ) -> Expiration:
427
        """
428
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
429
        the case. We're caching it because it can change depending on the set rules on the bucket.
430
        We can't use `lru_cache` as the parameters needs to be hashable
431
        :param lifecycle_rules: the bucket LifecycleRules
432
        :param s3_object: S3Object
433
        :param object_tags: the object tags
434
        :return: the Expiration header if there's a rule matching
435
        """
436
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
437
            return cached_exp
1✔
438

439
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
440
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
441
        ):
442
            expiration_header = serialize_expiration_header(
1✔
443
                lifecycle_rule["ID"],
444
                lifecycle_rule["Expiration"],
445
                s3_object.last_modified,
446
            )
447
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
448
            return expiration_header
1✔
449

450
    def _get_cross_account_bucket(
1✔
451
        self,
452
        context: RequestContext,
453
        bucket_name: BucketName,
454
        *,
455
        expected_bucket_owner: AccountId = None,
456
    ) -> tuple[S3Store, S3Bucket]:
457
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
458
            raise InvalidBucketOwnerAWSAccountID(
1✔
459
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
460
            )
461

462
        store = self.get_store(context.account_id, context.region)
1✔
463
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
464
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
465
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
466

467
            store = self.get_store(account_id, context.region)
1✔
468
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
UNCOV
469
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
470

471
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
472
            raise AccessDenied("Access Denied")
1✔
473

474
        return store, s3_bucket
1✔
475

476
    def _create_bucket_tags(self, resource_arn: str, account_id: str, region: str, tags: TagSet):
1✔
477
        store = self.get_store(account_id, region)
1✔
478
        store.TAGS.tag_resource(arn=resource_arn, tags=tags)
1✔
479

480
    def _remove_all_bucket_tags(self, resource_arn: str, account_id: str, region: str):
1✔
481
        store = self.get_store(account_id, region)
1✔
482
        store.TAGS.tags.pop(resource_arn, None)
1✔
483

484
    def _list_bucket_tags(self, resource_arn: str, account_id: str, region: str) -> TagSet:
1✔
485
        store = self.get_store(account_id, region)
1✔
486
        tags = store.TAGS.list_tags_for_resource(resource_arn)["Tags"]
1✔
487
        return tags
1✔
488

489
    @staticmethod
1✔
490
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
491
        # Use default account id for external access? would need an anonymous one
492
        return s3_stores[account_id][region_name]
1✔
493

494
    @handler("CreateBucket", expand=False)
1✔
495
    def create_bucket(
1✔
496
        self,
497
        context: RequestContext,
498
        request: CreateBucketRequest,
499
    ) -> CreateBucketOutput:
500
        if context.region == "aws-global":
1✔
501
            # TODO: extend this logic to probably all the provider, and maybe all services. S3 is the most impacted
502
            #  right now so this will help users to properly set a region in their config
503
            # See the `TestS3.test_create_bucket_aws_global` test
504
            raise AuthorizationHeaderMalformed(
1✔
505
                f"The authorization header is malformed; the region 'aws-global' is wrong; expecting '{AWS_REGION_US_EAST_1}'",
506
                HostId=S3_HOST_ID,
507
                Region=AWS_REGION_US_EAST_1,
508
            )
509

510
        bucket_name = request["Bucket"]
1✔
511

512
        if not is_bucket_name_valid(bucket_name):
1✔
513
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
514

515
        create_bucket_configuration = request.get("CreateBucketConfiguration") or {}
1✔
516

517
        bucket_tags = create_bucket_configuration.get("Tags", [])
1✔
518
        if bucket_tags:
1✔
519
            validate_tag_set(bucket_tags, type_set="create-bucket")
1✔
520

521
        location_constraint = create_bucket_configuration.get("LocationConstraint", "")
1✔
522
        validate_location_constraint(context.region, location_constraint)
1✔
523

524
        bucket_region = location_constraint
1✔
525
        if not location_constraint:
1✔
526
            bucket_region = AWS_REGION_US_EAST_1
1✔
527
        if location_constraint == BucketLocationConstraint.EU:
1✔
528
            bucket_region = AWS_REGION_EU_WEST_1
1✔
529

530
        store = self.get_store(context.account_id, bucket_region)
1✔
531

532
        if bucket_name in store.global_bucket_map:
1✔
533
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
534
            if existing_bucket_owner != context.account_id:
1✔
535
                raise BucketAlreadyExists()
1✔
536

537
            # if the existing bucket has the same owner, the behaviour will depend on the region and if the request has
538
            # tags
539
            if bucket_region != AWS_REGION_US_EAST_1 or bucket_tags:
1✔
540
                raise BucketAlreadyOwnedByYou(
1✔
541
                    "Your previous request to create the named bucket succeeded and you already own it.",
542
                    BucketName=bucket_name,
543
                )
544
            else:
545
                # CreateBucket is idempotent in us-east-1
546
                return CreateBucketOutput(Location=f"/{bucket_name}")
1✔
547

548
        if (
1✔
549
            object_ownership := request.get("ObjectOwnership")
550
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
551
            raise InvalidArgument(
1✔
552
                f"Invalid x-amz-object-ownership header: {object_ownership}",
553
                ArgumentName="x-amz-object-ownership",
554
            )
555
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
556
        owner = get_owner_for_account_id(context.account_id)
1✔
557
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
558

559
        s3_bucket = S3Bucket(
1✔
560
            name=bucket_name,
561
            account_id=context.account_id,
562
            bucket_region=bucket_region,
563
            owner=owner,
564
            acl=acl,
565
            object_ownership=request.get("ObjectOwnership"),
566
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
567
            location_constraint=location_constraint,
568
        )
569

570
        store.buckets[bucket_name] = s3_bucket
1✔
571
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
572
        if bucket_tags:
1✔
573
            self._create_bucket_tags(
1✔
574
                s3_bucket.bucket_arn, context.account_id, bucket_region, bucket_tags
575
            )
576
        self._cors_handler.invalidate_cache()
1✔
577
        self._storage_backend.create_bucket(bucket_name)
1✔
578

579
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
580
        location = (
1✔
581
            f"/{bucket_name}"
582
            if bucket_region == "us-east-1"
583
            else get_full_default_bucket_location(bucket_name)
584
        )
585
        response = CreateBucketOutput(Location=location)
1✔
586
        return response
1✔
587

588
    def delete_bucket(
1✔
589
        self,
590
        context: RequestContext,
591
        bucket: BucketName,
592
        expected_bucket_owner: AccountId = None,
593
        **kwargs,
594
    ) -> None:
595
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
596

597
        # the bucket still contains objects
598
        if not s3_bucket.objects.is_empty():
1✔
599
            message = "The bucket you tried to delete is not empty"
1✔
600
            if s3_bucket.versioning_status:
1✔
601
                message += ". You must delete all versions in the bucket."
1✔
602
            raise BucketNotEmpty(
1✔
603
                message,
604
                BucketName=bucket,
605
            )
606

607
        store.buckets.pop(bucket)
1✔
608
        store.global_bucket_map.pop(bucket)
1✔
609
        self._cors_handler.invalidate_cache()
1✔
610
        self._expiration_cache.pop(bucket, None)
1✔
611
        self._preconditions_locks.pop(bucket, None)
1✔
612
        # clean up the storage backend
613
        self._storage_backend.delete_bucket(bucket)
1✔
614
        self._remove_all_bucket_tags(
1✔
615
            s3_bucket.bucket_arn, context.account_id, s3_bucket.bucket_region
616
        )
617

618
    def list_buckets(
1✔
619
        self,
620
        context: RequestContext,
621
        max_buckets: MaxBuckets = None,
622
        continuation_token: Token = None,
623
        prefix: Prefix = None,
624
        bucket_region: BucketRegion = None,
625
        **kwargs,
626
    ) -> ListBucketsOutput:
627
        if bucket_region and not config.ALLOW_NONSTANDARD_REGIONS:
1✔
628
            if bucket_region not in get_valid_regions_for_service(self.service):
1✔
629
                raise InvalidArgument(
1✔
630
                    f"Argument value {bucket_region} is not a valid AWS Region",
631
                    ArgumentName="bucket-region",
632
                )
633

634
        owner = get_owner_for_account_id(context.account_id)
1✔
635
        store = self.get_store(context.account_id, context.region)
1✔
636

637
        decoded_continuation_token = (
1✔
638
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
639
            if continuation_token
640
            else None
641
        )
642

643
        count = 0
1✔
644
        buckets: list[Bucket] = []
1✔
645
        next_continuation_token = None
1✔
646

647
        # Comparing strings with case sensitivity since AWS is case-sensitive
648
        for bucket in sorted(store.buckets.values(), key=lambda r: r.name):
1✔
649
            if continuation_token and bucket.name < decoded_continuation_token:
1✔
650
                continue
1✔
651

652
            if prefix and not bucket.name.startswith(prefix):
1✔
653
                continue
1✔
654

655
            if bucket_region and not bucket.bucket_region == bucket_region:
1✔
656
                continue
1✔
657

658
            if max_buckets and count >= max_buckets:
1✔
659
                next_continuation_token = to_str(base64.urlsafe_b64encode(bucket.name.encode()))
1✔
660
                break
1✔
661

662
            output_bucket = Bucket(
1✔
663
                Name=bucket.name,
664
                CreationDate=bucket.creation_date,
665
                BucketRegion=bucket.bucket_region,
666
            )
667
            buckets.append(output_bucket)
1✔
668
            count += 1
1✔
669

670
        return ListBucketsOutput(
1✔
671
            Owner=owner, Buckets=buckets, Prefix=prefix, ContinuationToken=next_continuation_token
672
        )
673

674
    def head_bucket(
1✔
675
        self,
676
        context: RequestContext,
677
        bucket: BucketName,
678
        expected_bucket_owner: AccountId = None,
679
        **kwargs,
680
    ) -> HeadBucketOutput:
681
        if context.region == "aws-global":
1✔
682
            # TODO: extend this logic to probably all the provider, and maybe all services. S3 is the most impacted
683
            #  right now so this will help users to properly set a region in their config
684
            # See the `TestS3.test_create_bucket_aws_global` test
685
            raise AuthorizationHeaderMalformed(
1✔
686
                f"The authorization header is malformed; the region 'aws-global' is wrong; expecting '{AWS_REGION_US_EAST_1}'",
687
                HostId=S3_HOST_ID,
688
                Region=AWS_REGION_US_EAST_1,
689
            )
690

691
        store = self.get_store(context.account_id, context.region)
1✔
692
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
693
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
694
                # just to return the 404 error message
695
                raise NoSuchBucket()
1✔
696

UNCOV
697
            store = self.get_store(account_id, context.region)
×
UNCOV
698
            if not (s3_bucket := store.buckets.get(bucket)):
×
699
                # just to return the 404 error message
UNCOV
700
                raise NoSuchBucket()
×
701

702
        # TODO: this call is also used to check if the user has access/authorization for the bucket
703
        #  it can return 403
704
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)
1✔
705

706
    def get_bucket_location(
1✔
707
        self,
708
        context: RequestContext,
709
        bucket: BucketName,
710
        expected_bucket_owner: AccountId = None,
711
        **kwargs,
712
    ) -> GetBucketLocationOutput:
713
        """
714
        When implementing the ASF provider, this operation is implemented because:
715
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
716
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
717
        - We circumvent the root level element here by patching the spec such that this operation returns a
718
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
719
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
720
          the payload, which is why we need to manually do this here by manipulating the string.
721
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
722
        """
723
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
724

725
        return GetBucketLocationOutput(
1✔
726
            LocationConstraint=get_bucket_location_xml(s3_bucket.location_constraint)
727
        )
728

729
    @handler("PutObject", expand=False)
1✔
730
    def put_object(
1✔
731
        self,
732
        context: RequestContext,
733
        request: PutObjectRequest,
734
    ) -> PutObjectOutput:
735
        # TODO: validate order of validation
736
        # TODO: still need to handle following parameters
737
        #  request_payer: RequestPayer = None,
738
        bucket_name = request["Bucket"]
1✔
739
        key = request["Key"]
1✔
740
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
741

742
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
743
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
744
        ):
745
            raise InvalidStorageClass(
1✔
746
                "The storage class you specified is not valid", StorageClassRequested=storage_class
747
            )
748

749
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
750
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
751

752
        validate_object_key(key)
1✔
753

754
        if_match = request.get("IfMatch")
1✔
755
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
756
            raise NotImplementedException(
757
                "A header you provided implies functionality that is not implemented",
758
                Header="If-Match,If-None-Match",
759
                additionalMessage="Multiple conditional request headers present in the request",
760
            )
761

762
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
763
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
764
            raise NotImplementedException(
765
                "A header you provided implies functionality that is not implemented",
766
                Header=header_name,
767
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
768
            )
769

770
        system_metadata = get_system_metadata_from_request(request)
1✔
771
        if not system_metadata.get("ContentType"):
1✔
772
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
773

774
        user_metadata = decode_user_metadata(request.get("Metadata"))
1✔
775

776
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
777
        if version_id != "null":
1✔
778
            # if we are in a versioned bucket, we need to lock around the full key (all the versions)
779
            # because object versions have locks per version
780
            precondition_lock = self._preconditions_locks[bucket_name][key]
1✔
781
        else:
782
            precondition_lock = contextlib.nullcontext()
1✔
783

784
        etag_content_md5 = ""
1✔
785
        if content_md5 := request.get("ContentMD5"):
1✔
786
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
787
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
788
            if not etag_content_md5:
1✔
789
                raise InvalidDigest(
1✔
790
                    "The Content-MD5 you specified was invalid.",
791
                    Content_MD5=content_md5,
792
                )
793

794
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
795
        checksum_value = (
1✔
796
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
797
        )
798

799
        # TODO: we're not encrypting the object with the provided key for now
800
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
801
        validate_sse_c(
1✔
802
            algorithm=request.get("SSECustomerAlgorithm"),
803
            encryption_key=request.get("SSECustomerKey"),
804
            encryption_key_md5=sse_c_key_md5,
805
            server_side_encryption=request.get("ServerSideEncryption"),
806
        )
807

808
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
809
            request,
810
            s3_bucket,
811
            store,
812
        )
813

814
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
815

816
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
817

818
        if tagging := request.get("Tagging"):
1✔
819
            tagging = parse_tagging_header(tagging)
1✔
820

821
        s3_object = S3Object(
1✔
822
            key=key,
823
            version_id=version_id,
824
            storage_class=storage_class,
825
            expires=request.get("Expires"),
826
            user_metadata=user_metadata,
827
            system_metadata=system_metadata,
828
            checksum_algorithm=checksum_algorithm,
829
            checksum_value=checksum_value,
830
            encryption=encryption_parameters.encryption,
831
            kms_key_id=encryption_parameters.kms_key_id,
832
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
833
            sse_key_hash=sse_c_key_md5,
834
            lock_mode=lock_parameters.lock_mode,
835
            lock_legal_status=lock_parameters.lock_legal_status,
836
            lock_until=lock_parameters.lock_until,
837
            website_redirect_location=request.get("WebsiteRedirectLocation"),
838
            acl=acl,
839
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
840
        )
841

842
        body = request.get("Body")
1✔
843
        # check if chunked request
844
        headers = context.request.headers
1✔
845
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
846
            "STREAMING-"
847
        ) or "aws-chunked" in headers.get("content-encoding", "")
848
        if is_aws_chunked:
1✔
849
            checksum_algorithm = (
1✔
850
                checksum_algorithm
851
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
852
            )
853
            if checksum_algorithm:
1✔
854
                s3_object.checksum_algorithm = checksum_algorithm
1✔
855

856
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
857
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
858

859
            # S3 removes the `aws-chunked` value from ContentEncoding
860
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
861
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
862
                if encodings:
1✔
863
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
864

865
        with (
1✔
866
            precondition_lock,
867
            self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object,
868
        ):
869
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
870
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
871
            # opening the lock and other requests can check this condition
872
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
873
                raise PreconditionFailed(
1✔
874
                    "At least one of the pre-conditions you specified did not hold",
875
                    Condition="If-None-Match",
876
                )
877

878
            elif if_match:
1✔
879
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
880

881
            s3_stored_object.write(body)
1✔
882

883
            if s3_object.checksum_algorithm:
1✔
884
                if not s3_object.checksum_value:
1✔
885
                    s3_object.checksum_value = s3_stored_object.checksum
1✔
886
                elif not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
887
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
888
                    raise InvalidRequest(
1✔
889
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
890
                    )
891
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
892
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
893
                    raise BadDigest(
1✔
894
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
895
                    )
896

897
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
898
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
899
            if content_md5:
1✔
900
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
901
                if calculated_md5 != content_md5:
1✔
902
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
903
                    raise BadDigest(
1✔
904
                        "The Content-MD5 you specified did not match what we received.",
905
                        ExpectedDigest=etag_content_md5,
906
                        CalculatedDigest=calculated_md5,
907
                    )
908

909
            s3_bucket.objects.set(key, s3_object)
1✔
910

911
        # in case we are overriding an object, delete the tags entry
912
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
913
        store.TAGS.tags.pop(key_id, None)
1✔
914
        if tagging:
1✔
915
            store.TAGS.tags[key_id] = tagging
1✔
916

917
        # RequestCharged: Optional[RequestCharged]  # TODO
918
        response = PutObjectOutput(
1✔
919
            ETag=s3_object.quoted_etag,
920
        )
921
        if s3_bucket.versioning_status == "Enabled":
1✔
922
            response["VersionId"] = s3_object.version_id
1✔
923

924
        if s3_object.checksum_algorithm:
1✔
925
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
926
            response["ChecksumType"] = s3_object.checksum_type
1✔
927

928
        if s3_bucket.lifecycle_rules:
1✔
929
            if expiration_header := self._get_expiration_header(
1✔
930
                s3_bucket.lifecycle_rules,
931
                bucket_name,
932
                s3_object,
933
                store.TAGS.tags.get(key_id, {}),
934
            ):
935
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
936
                #  apply them everytime we get/head an object
937
                response["Expiration"] = expiration_header
1✔
938

939
        add_encryption_to_response(response, s3_object=s3_object)
1✔
940
        if sse_c_key_md5:
1✔
941
            response["SSECustomerAlgorithm"] = "AES256"
1✔
942
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
943

944
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
945

946
        return response
1✔
947

948
    @handler("GetObject", expand=False)
1✔
949
    def get_object(
1✔
950
        self,
951
        context: RequestContext,
952
        request: GetObjectRequest,
953
    ) -> GetObjectOutput:
954
        # TODO: missing handling parameters:
955
        #  request_payer: RequestPayer = None,
956
        #  expected_bucket_owner: AccountId = None,
957

958
        bucket_name = request["Bucket"]
1✔
959
        object_key = request["Key"]
1✔
960
        version_id = request.get("VersionId")
1✔
961
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
962

963
        s3_object = s3_bucket.get_object(
1✔
964
            key=object_key,
965
            version_id=version_id,
966
            http_method="GET",
967
        )
968

969
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
970
            raise InvalidObjectState(
1✔
971
                "The operation is not valid for the object's storage class",
972
                StorageClass=s3_object.storage_class,
973
            )
974

975
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
976
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
977

978
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
979
        if s3_object.sse_key_hash:
1✔
980
            if s3_object.sse_key_hash and not sse_c_key_md5:
1✔
981
                raise InvalidRequest(
1✔
982
                    "The object was stored using a form of Server Side Encryption. "
983
                    "The correct parameters must be provided to retrieve the object."
984
                )
985
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
986
                raise AccessDenied(
1✔
987
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
988
                )
989

990
        validate_sse_c(
1✔
991
            algorithm=request.get("SSECustomerAlgorithm"),
992
            encryption_key=request.get("SSECustomerKey"),
993
            encryption_key_md5=sse_c_key_md5,
994
        )
995

996
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
997

998
        range_header = request.get("Range")
1✔
999
        part_number = request.get("PartNumber")
1✔
1000
        if range_header and part_number:
1✔
1001
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
1002
        range_data = None
1✔
1003
        if range_header:
1✔
1004
            range_data = parse_range_header(range_header, s3_object.size)
1✔
1005
        elif part_number:
1✔
1006
            range_data = get_part_range(s3_object, part_number)
1✔
1007

1008
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
1009
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
1010
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
1011
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
1012
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
1013

1014
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
1015
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
1016
        # the object again
1017
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
1018
            s3_object = s3_bucket.get_object(
1✔
1019
                key=object_key,
1020
                version_id=version_id,
1021
                http_method="GET",
1022
            )
1023

1024
        response = GetObjectOutput(
1✔
1025
            AcceptRanges="bytes",
1026
            **s3_object.get_system_metadata_fields(),
1027
        )
1028
        if s3_object.user_metadata:
1✔
1029
            response["Metadata"] = encode_user_metadata(s3_object.user_metadata)
1✔
1030

1031
        if s3_object.parts and request.get("PartNumber"):
1✔
1032
            response["PartsCount"] = len(s3_object.parts)
1✔
1033

1034
        if s3_object.version_id:
1✔
1035
            response["VersionId"] = s3_object.version_id
1✔
1036

1037
        if s3_object.website_redirect_location:
1✔
1038
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1039

1040
        if s3_object.restore:
1✔
UNCOV
1041
            response["Restore"] = s3_object.restore
×
1042

1043
        checksum_value = None
1✔
1044
        checksum_type = None
1✔
1045
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1046
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1047
                checksum_value = s3_object.checksum_value
1✔
1048
                checksum_type = s3_object.checksum_type
1✔
1049

1050
        if range_data:
1✔
1051
            s3_stored_object.seek(range_data.begin)
1✔
1052
            response["Body"] = LimitedIterableStream(
1✔
1053
                s3_stored_object, max_length=range_data.content_length
1054
            )
1055
            response["ContentRange"] = range_data.content_range
1✔
1056
            response["ContentLength"] = range_data.content_length
1✔
1057
            response["StatusCode"] = 206
1✔
1058
            if checksum_value:
1✔
1059
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1060
                    part_data = s3_object.parts[str(part_number)]
1✔
1061
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1062
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1063
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1064

1065
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1066
                # only had one part
1067
                elif range_data.content_length == s3_object.size:
1✔
1068
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1069
                    response["ChecksumType"] = checksum_type
1✔
1070
        else:
1071
            response["Body"] = s3_stored_object
1✔
1072
            if checksum_value:
1✔
1073
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1074
                response["ChecksumType"] = checksum_type
1✔
1075

1076
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1077

1078
        if object_tags := store.TAGS.tags.get(
1✔
1079
            get_unique_key_id(bucket_name, object_key, version_id)
1080
        ):
1081
            response["TagCount"] = len(object_tags)
1✔
1082

1083
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
1084
            if expiration_header := self._get_expiration_header(
1✔
1085
                s3_bucket.lifecycle_rules,
1086
                bucket_name,
1087
                s3_object,
1088
                object_tags,
1089
            ):
1090
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1091
                #  apply them everytime we get/head an object
1092
                response["Expiration"] = expiration_header
1✔
1093

1094
        # TODO: missing returned fields
1095
        #     RequestCharged: Optional[RequestCharged]
1096
        #     ReplicationStatus: Optional[ReplicationStatus]
1097

1098
        if s3_object.lock_mode:
1✔
UNCOV
1099
            response["ObjectLockMode"] = s3_object.lock_mode
×
UNCOV
1100
            if s3_object.lock_until:
×
UNCOV
1101
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1102
        if s3_object.lock_legal_status:
1✔
UNCOV
1103
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1104

1105
        if sse_c_key_md5:
1✔
1106
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1107
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1108

1109
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1110
            if request_param_value := request.get(request_param):
1✔
1111
                if isinstance(request_param_value, str):
1✔
1112
                    try:
1✔
1113
                        request_param_value.encode("latin-1")
1✔
1114
                    except UnicodeEncodeError:
1✔
1115
                        raise InvalidArgument(
1✔
1116
                            "Header value cannot be represented using ISO-8859-1.",
1117
                            ArgumentName=header_name_from_capitalized_param(request_param),
1118
                            ArgumentValue=request_param_value,
1119
                            HostId=S3_HOST_ID,
1120
                        )
1121

1122
                response[response_param] = request_param_value
1✔
1123

1124
        return response
1✔
1125

1126
    @handler("HeadObject", expand=False)
1✔
1127
    def head_object(
1✔
1128
        self,
1129
        context: RequestContext,
1130
        request: HeadObjectRequest,
1131
    ) -> HeadObjectOutput:
1132
        bucket_name = request["Bucket"]
1✔
1133
        object_key = request["Key"]
1✔
1134
        version_id = request.get("VersionId")
1✔
1135
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1136

1137
        s3_object = s3_bucket.get_object(
1✔
1138
            key=object_key,
1139
            version_id=version_id,
1140
            http_method="HEAD",
1141
        )
1142

1143
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1144

1145
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1146
        if s3_object.sse_key_hash:
1✔
1147
            if not sse_c_key_md5:
1✔
UNCOV
1148
                raise InvalidRequest(
×
1149
                    "The object was stored using a form of Server Side Encryption. "
1150
                    "The correct parameters must be provided to retrieve the object."
1151
                )
1152
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1153
                raise AccessDenied(
1✔
1154
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1155
                )
1156

1157
        validate_sse_c(
1✔
1158
            algorithm=request.get("SSECustomerAlgorithm"),
1159
            encryption_key=request.get("SSECustomerKey"),
1160
            encryption_key_md5=sse_c_key_md5,
1161
        )
1162

1163
        response = HeadObjectOutput(
1✔
1164
            AcceptRanges="bytes",
1165
            **s3_object.get_system_metadata_fields(),
1166
        )
1167
        if s3_object.user_metadata:
1✔
1168
            response["Metadata"] = encode_user_metadata(s3_object.user_metadata)
1✔
1169

1170
        checksum_value = None
1✔
1171
        checksum_type = None
1✔
1172
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1173
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1174
                checksum_value = s3_object.checksum_value
1✔
1175
                checksum_type = s3_object.checksum_type
1✔
1176

1177
        if s3_object.parts and request.get("PartNumber"):
1✔
1178
            response["PartsCount"] = len(s3_object.parts)
1✔
1179

1180
        if s3_object.version_id:
1✔
1181
            response["VersionId"] = s3_object.version_id
1✔
1182

1183
        if s3_object.website_redirect_location:
1✔
1184
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1185

1186
        if s3_object.restore:
1✔
1187
            response["Restore"] = s3_object.restore
1✔
1188

1189
        range_header = request.get("Range")
1✔
1190
        part_number = request.get("PartNumber")
1✔
1191
        if range_header and part_number:
1✔
UNCOV
1192
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1193
        range_data = None
1✔
1194
        if range_header:
1✔
UNCOV
1195
            range_data = parse_range_header(range_header, s3_object.size)
×
1196
        elif part_number:
1✔
1197
            range_data = get_part_range(s3_object, part_number)
1✔
1198

1199
        if range_data:
1✔
1200
            response["ContentLength"] = range_data.content_length
1✔
1201
            response["ContentRange"] = range_data.content_range
1✔
1202
            response["StatusCode"] = 206
1✔
1203
            if checksum_value:
1✔
1204
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1205
                    part_data = s3_object.parts[str(part_number)]
1✔
1206
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1207
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1208
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1209

1210
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1211
                # only had one part
1212
                elif range_data.content_length == s3_object.size:
1✔
1213
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1214
                    response["ChecksumType"] = checksum_type
1✔
1215
        elif checksum_value:
1✔
1216
            response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1217
            response["ChecksumType"] = checksum_type
1✔
1218

1219
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1220
        object_tags = store.TAGS.tags.get(
1✔
1221
            get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1222
        )
1223
        if object_tags:
1✔
1224
            response["TagCount"] = len(object_tags)
1✔
1225

1226
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1227
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1228
            if expiration_header := self._get_expiration_header(
1✔
1229
                s3_bucket.lifecycle_rules,
1230
                bucket_name,
1231
                s3_object,
1232
                object_tags,
1233
            ):
1234
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1235
                #  apply them everytime we get/head an object
1236
                response["Expiration"] = expiration_header
1✔
1237

1238
        if s3_object.lock_mode:
1✔
1239
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1240
            if s3_object.lock_until:
1✔
1241
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1242
        if s3_object.lock_legal_status:
1✔
1243
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1244

1245
        if sse_c_key_md5:
1✔
1246
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1247
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1248

1249
        # TODO: missing return fields:
1250
        #  ArchiveStatus: Optional[ArchiveStatus]
1251
        #  RequestCharged: Optional[RequestCharged]
1252
        #  ReplicationStatus: Optional[ReplicationStatus]
1253

1254
        return response
1✔
1255

1256
    def delete_object(
1✔
1257
        self,
1258
        context: RequestContext,
1259
        bucket: BucketName,
1260
        key: ObjectKey,
1261
        mfa: MFA = None,
1262
        version_id: ObjectVersionId = None,
1263
        request_payer: RequestPayer = None,
1264
        bypass_governance_retention: BypassGovernanceRetention = None,
1265
        expected_bucket_owner: AccountId = None,
1266
        if_match: IfMatch = None,
1267
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1268
        if_match_size: IfMatchSize = None,
1269
        **kwargs,
1270
    ) -> DeleteObjectOutput:
1271
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1272

1273
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1274
            raise InvalidArgument(
1✔
1275
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1276
                ArgumentName="x-amz-bypass-governance-retention",
1277
            )
1278

1279
        # TODO: this is only supported for Directory Buckets
1280
        non_supported_precondition = None
1✔
1281
        if if_match:
1✔
1282
            non_supported_precondition = "If-Match"
1✔
1283
        if if_match_size:
1✔
1284
            non_supported_precondition = "x-amz-if-match-size"
1✔
1285
        if if_match_last_modified_time:
1✔
1286
            non_supported_precondition = "x-amz-if-match-last-modified-time"
1✔
1287
        if non_supported_precondition:
1✔
1288
            LOG.warning(
1✔
1289
                "DeleteObject Preconditions is only supported for Directory Buckets. "
1290
                "LocalStack does not support Directory Buckets yet."
1291
            )
1292
            raise NotImplementedException(
1293
                "A header you provided implies functionality that is not implemented",
1294
                Header=non_supported_precondition,
1295
            )
1296

1297
        if s3_bucket.versioning_status is None:
1✔
1298
            if version_id and version_id != "null":
1✔
1299
                raise InvalidArgument(
1✔
1300
                    "Invalid version id specified",
1301
                    ArgumentName="versionId",
1302
                    ArgumentValue=version_id,
1303
                )
1304

1305
            found_object = s3_bucket.objects.pop(key, None)
1✔
1306
            # TODO: RequestCharged
1307
            if found_object:
1✔
1308
                self._storage_backend.remove(bucket, found_object)
1✔
1309
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1310
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1311

1312
            return DeleteObjectOutput()
1✔
1313

1314
        if not version_id:
1✔
1315
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1316
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1317
            s3_bucket.objects.set(key, delete_marker)
1✔
1318
            s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
1319
                context,
1320
                s3_bucket=s3_bucket,
1321
                s3_object=delete_marker,
1322
            )
1323
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1324
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1325

1326
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1327

1328
        if key not in s3_bucket.objects:
1✔
UNCOV
1329
            return DeleteObjectOutput()
×
1330

1331
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1332
            raise InvalidArgument(
1✔
1333
                "Invalid version id specified",
1334
                ArgumentName="versionId",
1335
                ArgumentValue=version_id,
1336
            )
1337

1338
        if s3_object.is_locked(bypass_governance_retention):
1✔
1339
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1340

1341
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1342
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1343

1344
        if isinstance(s3_object, S3DeleteMarker):
1✔
1345
            response["DeleteMarker"] = True
1✔
1346
        else:
1347
            self._storage_backend.remove(bucket, s3_object)
1✔
1348
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1349
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1350

1351
        if key not in s3_bucket.objects:
1✔
1352
            # we clean up keys that do not have any object versions in them anymore
1353
            self._preconditions_locks[bucket].pop(key, None)
1✔
1354

1355
        return response
1✔
1356

1357
    def delete_objects(
1✔
1358
        self,
1359
        context: RequestContext,
1360
        bucket: BucketName,
1361
        delete: Delete,
1362
        mfa: MFA = None,
1363
        request_payer: RequestPayer = None,
1364
        bypass_governance_retention: BypassGovernanceRetention = None,
1365
        expected_bucket_owner: AccountId = None,
1366
        checksum_algorithm: ChecksumAlgorithm = None,
1367
        **kwargs,
1368
    ) -> DeleteObjectsOutput:
1369
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1370

1371
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1372
            raise InvalidArgument(
1✔
1373
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1374
                ArgumentName="x-amz-bypass-governance-retention",
1375
            )
1376

1377
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1378
        if not objects:
1✔
UNCOV
1379
            raise MalformedXML()
×
1380

1381
        # TODO: max 1000 delete at once? test against AWS?
1382

1383
        quiet = delete.get("Quiet", False)
1✔
1384
        deleted = []
1✔
1385
        errors = []
1✔
1386

1387
        to_remove = []
1✔
1388
        versioned_keys = set()
1✔
1389
        for to_delete_object in objects:
1✔
1390
            object_key = to_delete_object.get("Key")
1✔
1391
            version_id = to_delete_object.get("VersionId")
1✔
1392
            if s3_bucket.versioning_status is None:
1✔
1393
                if version_id and version_id != "null":
1✔
1394
                    errors.append(
1✔
1395
                        Error(
1396
                            Code="NoSuchVersion",
1397
                            Key=object_key,
1398
                            Message="The specified version does not exist.",
1399
                            VersionId=version_id,
1400
                        )
1401
                    )
1402
                    continue
1✔
1403

1404
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1405
                if found_object:
1✔
1406
                    to_remove.append(found_object)
1✔
1407
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1408
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1409
                # small hack to not create a fake object for nothing
1410
                elif s3_bucket.notification_configuration:
1✔
1411
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1412
                    # for a non-existing object being deleted
1413
                    self._notify(
1✔
1414
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1415
                    )
1416

1417
                if not quiet:
1✔
1418
                    deleted.append(DeletedObject(Key=object_key))
1✔
1419

1420
                continue
1✔
1421

1422
            if not version_id:
1✔
1423
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1424
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1425
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1426
                s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
1427
                    context,
1428
                    s3_bucket=s3_bucket,
1429
                    s3_object=delete_marker,
1430
                )
1431
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1432
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1433

1434
                if not quiet:
1✔
1435
                    deleted.append(
1✔
1436
                        DeletedObject(
1437
                            DeleteMarker=True,
1438
                            DeleteMarkerVersionId=delete_marker_id,
1439
                            Key=object_key,
1440
                        )
1441
                    )
1442
                continue
1✔
1443

1444
            if not (
1✔
1445
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1446
            ):
1447
                errors.append(
1✔
1448
                    Error(
1449
                        Code="NoSuchVersion",
1450
                        Key=object_key,
1451
                        Message="The specified version does not exist.",
1452
                        VersionId=version_id,
1453
                    )
1454
                )
1455
                continue
1✔
1456

1457
            if found_object.is_locked(bypass_governance_retention):
1✔
1458
                errors.append(
1✔
1459
                    Error(
1460
                        Code="AccessDenied",
1461
                        Key=object_key,
1462
                        Message="Access Denied because object protected by object lock.",
1463
                        VersionId=version_id,
1464
                    )
1465
                )
1466
                continue
1✔
1467

1468
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1469
            versioned_keys.add(object_key)
1✔
1470

1471
            if not quiet:
1✔
1472
                deleted_object = DeletedObject(
1✔
1473
                    Key=object_key,
1474
                    VersionId=version_id,
1475
                )
1476
                if isinstance(found_object, S3DeleteMarker):
1✔
1477
                    deleted_object["DeleteMarker"] = True
1✔
1478
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1479

1480
                deleted.append(deleted_object)
1✔
1481

1482
            if isinstance(found_object, S3Object):
1✔
1483
                to_remove.append(found_object)
1✔
1484

1485
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1486
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1487

1488
        for versioned_key in versioned_keys:
1✔
1489
            # we clean up keys that do not have any object versions in them anymore
1490
            if versioned_key not in s3_bucket.objects:
1✔
1491
                self._preconditions_locks[bucket].pop(versioned_key, None)
1✔
1492

1493
        # TODO: request charged
1494
        self._storage_backend.remove(bucket, to_remove)
1✔
1495
        response: DeleteObjectsOutput = {}
1✔
1496
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1497
        if errors:
1✔
1498
            response["Errors"] = errors
1✔
1499
        if not quiet:
1✔
1500
            response["Deleted"] = deleted
1✔
1501

1502
        return response
1✔
1503

1504
    @handler("CopyObject", expand=False)
1✔
1505
    def copy_object(
1✔
1506
        self,
1507
        context: RequestContext,
1508
        request: CopyObjectRequest,
1509
    ) -> CopyObjectOutput:
1510
        # request_payer: RequestPayer = None,  # TODO:
1511
        dest_bucket = request["Bucket"]
1✔
1512
        dest_key = request["Key"]
1✔
1513
        validate_object_key(dest_key)
1✔
1514
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1515

1516
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1517
            request.get("CopySource")
1518
        )
1519
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1520

1521
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1522
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1523

1524
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1525
        try:
1✔
1526
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
UNCOV
1527
        except MethodNotAllowed:
×
1528
            raise InvalidRequest(
×
1529
                "The source of a copy request may not specifically refer to a delete marker by version id."
1530
            )
1531

1532
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
UNCOV
1533
            raise InvalidObjectState(
×
1534
                "Operation is not valid for the source object's storage class",
1535
                StorageClass=src_s3_object.storage_class,
1536
            )
1537

1538
        if failed_condition := get_failed_precondition_copy_source(
1✔
1539
            request, src_s3_object.last_modified, src_s3_object.etag
1540
        ):
1541
            raise PreconditionFailed(
1✔
1542
                "At least one of the pre-conditions you specified did not hold",
1543
                Condition=failed_condition,
1544
            )
1545

1546
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1547
        if src_s3_object.sse_key_hash:
1✔
1548
            if not source_sse_c_key_md5:
1✔
1549
                raise InvalidRequest(
1✔
1550
                    "The object was stored using a form of Server Side Encryption. "
1551
                    "The correct parameters must be provided to retrieve the object."
1552
                )
1553
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
UNCOV
1554
                raise AccessDenied("Access Denied")
×
1555

1556
        validate_sse_c(
1✔
1557
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1558
            encryption_key=request.get("CopySourceSSECustomerKey"),
1559
            encryption_key_md5=source_sse_c_key_md5,
1560
        )
1561

1562
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1563
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1564
        # validate target SSE-C parameters
1565
        validate_sse_c(
1✔
1566
            algorithm=request.get("SSECustomerAlgorithm"),
1567
            encryption_key=request.get("SSECustomerKey"),
1568
            encryption_key_md5=target_sse_c_key_md5,
1569
            server_side_encryption=server_side_encryption,
1570
        )
1571

1572
        # TODO validate order of validation
1573
        storage_class = request.get("StorageClass")
1✔
1574
        metadata_directive = request.get("MetadataDirective")
1✔
1575
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1576
        # we need to check for identity of the object, to see if the default one has been changed
1577
        is_default_encryption = (
1✔
1578
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1579
            and src_s3_object.encryption == "AES256"
1580
        )
1581
        if (
1✔
1582
            src_bucket == dest_bucket
1583
            and src_key == dest_key
1584
            and not any(
1585
                (
1586
                    storage_class,
1587
                    server_side_encryption,
1588
                    target_sse_c_key_md5,
1589
                    metadata_directive == "REPLACE",
1590
                    website_redirect_location,
1591
                    dest_s3_bucket.encryption_rule
1592
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1593
                    src_s3_object.restore,
1594
                )
1595
            )
1596
        ):
1597
            raise InvalidRequest(
1✔
1598
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1599
                "object's metadata, storage class, website redirect location or encryption attributes."
1600
            )
1601

1602
        if tagging := request.get("Tagging"):
1✔
1603
            tagging = parse_tagging_header(tagging)
1✔
1604

1605
        if metadata_directive == "REPLACE":
1✔
1606
            user_metadata = decode_user_metadata(request.get("Metadata"))
1✔
1607
            system_metadata = get_system_metadata_from_request(request)
1✔
1608
            if not system_metadata.get("ContentType"):
1✔
1609
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1610
        else:
1611
            user_metadata = src_s3_object.user_metadata
1✔
1612
            system_metadata = src_s3_object.system_metadata
1✔
1613

1614
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1615

1616
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1617
            request,
1618
            dest_s3_bucket,
1619
            store,
1620
        )
1621
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1622
            request, dest_s3_bucket
1623
        )
1624

1625
        acl = get_access_control_policy_for_new_resource_request(
1✔
1626
            request, owner=dest_s3_bucket.owner
1627
        )
1628
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
1629

1630
        s3_object = S3Object(
1✔
1631
            key=dest_key,
1632
            size=src_s3_object.size,
1633
            version_id=dest_version_id,
1634
            storage_class=storage_class,
1635
            expires=request.get("Expires"),
1636
            user_metadata=user_metadata,
1637
            system_metadata=system_metadata,
1638
            checksum_algorithm=checksum_algorithm or src_s3_object.checksum_algorithm,
1639
            encryption=encryption_parameters.encryption,
1640
            kms_key_id=encryption_parameters.kms_key_id,
1641
            bucket_key_enabled=request.get(
1642
                "BucketKeyEnabled"
1643
            ),  # CopyObject does not inherit from the bucket here
1644
            sse_key_hash=target_sse_c_key_md5,
1645
            lock_mode=lock_parameters.lock_mode,
1646
            lock_legal_status=lock_parameters.lock_legal_status,
1647
            lock_until=lock_parameters.lock_until,
1648
            website_redirect_location=website_redirect_location,
1649
            expiration=None,  # TODO, from lifecycle
1650
            acl=acl,
1651
            owner=dest_s3_bucket.owner,
1652
        )
1653

1654
        with self._storage_backend.copy(
1✔
1655
            src_bucket=src_bucket,
1656
            src_object=src_s3_object,
1657
            dest_bucket=dest_bucket,
1658
            dest_object=s3_object,
1659
        ) as s3_stored_object:
1660
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1661
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1662

1663
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1664

1665
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1666

1667
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1668
            store.TAGS.tags[dest_key_id] = tagging or {}
1✔
1669
        else:
1670
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1671
            src_tags = store.TAGS.tags.get(src_key_id, {})
1✔
1672
            store.TAGS.tags[dest_key_id] = copy.copy(src_tags)
1✔
1673

1674
        copy_object_result = CopyObjectResult(
1✔
1675
            ETag=s3_object.quoted_etag,
1676
            LastModified=s3_object.last_modified,
1677
        )
1678
        if s3_object.checksum_algorithm:
1✔
1679
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1680
                s3_object.checksum_value
1681
            )
1682

1683
        response = CopyObjectOutput(
1✔
1684
            CopyObjectResult=copy_object_result,
1685
        )
1686

1687
        if s3_object.version_id:
1✔
1688
            response["VersionId"] = s3_object.version_id
1✔
1689

1690
        if s3_object.expiration:
1✔
UNCOV
1691
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1692

1693
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1694
        if target_sse_c_key_md5:
1✔
1695
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1696
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1697

1698
        if (
1✔
1699
            src_s3_bucket.versioning_status
1700
            and src_s3_object.version_id
1701
            and src_s3_object.version_id != "null"
1702
        ):
1703
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1704

1705
        # RequestCharged: Optional[RequestCharged] # TODO
1706
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1707

1708
        return response
1✔
1709

1710
    def list_objects(
1✔
1711
        self,
1712
        context: RequestContext,
1713
        bucket: BucketName,
1714
        delimiter: Delimiter = None,
1715
        encoding_type: EncodingType = None,
1716
        marker: Marker = None,
1717
        max_keys: MaxKeys = None,
1718
        prefix: Prefix = None,
1719
        request_payer: RequestPayer = None,
1720
        expected_bucket_owner: AccountId = None,
1721
        optional_object_attributes: OptionalObjectAttributesList = None,
1722
        **kwargs,
1723
    ) -> ListObjectsOutput:
1724
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1725

1726
        common_prefixes = set()
1✔
1727
        count = 0
1✔
1728
        is_truncated = False
1✔
1729
        next_key_marker = None
1✔
1730
        max_keys = max_keys or 1000
1✔
1731
        prefix = prefix or ""
1✔
1732
        delimiter = delimiter or ""
1✔
1733
        if encoding_type:
1✔
1734
            prefix = urlparse.quote(prefix)
1✔
1735
            delimiter = urlparse.quote(delimiter)
1✔
1736

1737
        s3_objects: list[Object] = []
1✔
1738

1739
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1740
        last_key = all_keys[-1] if all_keys else None
1✔
1741

1742
        # sort by key
1743
        for s3_object in all_keys:
1✔
1744
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1745
            # skip all keys that alphabetically come before key_marker
1746
            if marker:
1✔
1747
                if key <= marker:
1✔
1748
                    continue
1✔
1749

1750
            # Filter for keys that start with prefix
1751
            if prefix and not key.startswith(prefix):
1✔
UNCOV
1752
                continue
×
1753

1754
            # see ListObjectsV2 for the logic comments (shared logic here)
1755
            prefix_including_delimiter = None
1✔
1756
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1757
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1758
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1759

1760
                if prefix_including_delimiter in common_prefixes or (
1✔
1761
                    marker and marker.startswith(prefix_including_delimiter)
1762
                ):
1763
                    continue
1✔
1764

1765
            if prefix_including_delimiter:
1✔
1766
                common_prefixes.add(prefix_including_delimiter)
1✔
1767
            else:
1768
                # TODO: add RestoreStatus if present
1769
                object_data = Object(
1✔
1770
                    Key=key,
1771
                    ETag=s3_object.quoted_etag,
1772
                    Owner=s3_bucket.owner,  # TODO: verify reality
1773
                    Size=s3_object.size,
1774
                    LastModified=s3_object.last_modified,
1775
                    StorageClass=s3_object.storage_class,
1776
                )
1777

1778
                if s3_object.checksum_algorithm:
1✔
1779
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1780
                    object_data["ChecksumType"] = s3_object.checksum_type
1✔
1781

1782
                s3_objects.append(object_data)
1✔
1783

1784
            # we just added a CommonPrefix or an Object, increase the counter
1785
            count += 1
1✔
1786
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1787
                is_truncated = True
1✔
1788
                if prefix_including_delimiter:
1✔
1789
                    next_key_marker = prefix_including_delimiter
1✔
1790
                elif s3_objects:
1✔
1791
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1792
                break
1✔
1793

1794
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1795

1796
        response = ListObjectsOutput(
1✔
1797
            IsTruncated=is_truncated,
1798
            Name=bucket,
1799
            MaxKeys=max_keys,
1800
            Prefix=prefix or "",
1801
            Marker=marker or "",
1802
        )
1803
        if s3_objects:
1✔
1804
            response["Contents"] = s3_objects
1✔
1805
        if encoding_type:
1✔
1806
            response["EncodingType"] = EncodingType.url
1✔
1807
        if delimiter:
1✔
1808
            response["Delimiter"] = delimiter
1✔
1809
        if common_prefixes:
1✔
1810
            response["CommonPrefixes"] = common_prefixes
1✔
1811
        if delimiter and next_key_marker:
1✔
1812
            response["NextMarker"] = next_key_marker
1✔
1813
        if s3_bucket.bucket_region != "us-east-1":
1✔
UNCOV
1814
            response["BucketRegion"] = s3_bucket.bucket_region
×
1815

1816
        # RequestCharged: Optional[RequestCharged]  # TODO
1817
        return response
1✔
1818

1819
    def list_objects_v2(
1✔
1820
        self,
1821
        context: RequestContext,
1822
        bucket: BucketName,
1823
        delimiter: Delimiter = None,
1824
        encoding_type: EncodingType = None,
1825
        max_keys: MaxKeys = None,
1826
        prefix: Prefix = None,
1827
        continuation_token: Token = None,
1828
        fetch_owner: FetchOwner = None,
1829
        start_after: StartAfter = None,
1830
        request_payer: RequestPayer = None,
1831
        expected_bucket_owner: AccountId = None,
1832
        optional_object_attributes: OptionalObjectAttributesList = None,
1833
        **kwargs,
1834
    ) -> ListObjectsV2Output:
1835
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1836

1837
        if continuation_token == "":
1✔
1838
            raise InvalidArgument(
1✔
1839
                "The continuation token provided is incorrect",
1840
                ArgumentName="continuation-token",
1841
            )
1842

1843
        common_prefixes = set()
1✔
1844
        count = 0
1✔
1845
        is_truncated = False
1✔
1846
        next_continuation_token = None
1✔
1847
        max_keys = max_keys or 1000
1✔
1848
        prefix = prefix or ""
1✔
1849
        delimiter = delimiter or ""
1✔
1850
        if encoding_type:
1✔
1851
            prefix = urlparse.quote(prefix)
1✔
1852
            delimiter = urlparse.quote(delimiter)
1✔
1853
        decoded_continuation_token = (
1✔
1854
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
1855
            if continuation_token
1856
            else None
1857
        )
1858

1859
        s3_objects: list[Object] = []
1✔
1860

1861
        # sort by key
1862
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1863
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1864

1865
            # skip all keys that alphabetically come before continuation_token
1866
            if continuation_token:
1✔
1867
                if key < decoded_continuation_token:
1✔
1868
                    continue
1✔
1869

1870
            elif start_after:
1✔
1871
                if key <= start_after:
1✔
1872
                    continue
1✔
1873

1874
            # Filter for keys that start with prefix
1875
            if prefix and not key.startswith(prefix):
1✔
1876
                continue
1✔
1877

1878
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1879
            prefix_including_delimiter = None
1✔
1880
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1881
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1882
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1883

1884
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1885
                # the entry without increasing the counter. We need to iterate over all of these entries before
1886
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1887
                if prefix_including_delimiter in common_prefixes:
1✔
1888
                    continue
1✔
1889

1890
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1891
            if count >= max_keys:
1✔
1892
                is_truncated = True
1✔
1893
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
1✔
1894
                break
1✔
1895

1896
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1897
            # else, it means it's a new Object, add it to the Contents
1898
            if prefix_including_delimiter:
1✔
1899
                common_prefixes.add(prefix_including_delimiter)
1✔
1900
            else:
1901
                # TODO: add RestoreStatus if present
1902
                object_data = Object(
1✔
1903
                    Key=key,
1904
                    ETag=s3_object.quoted_etag,
1905
                    Size=s3_object.size,
1906
                    LastModified=s3_object.last_modified,
1907
                    StorageClass=s3_object.storage_class,
1908
                )
1909

1910
                if fetch_owner:
1✔
UNCOV
1911
                    object_data["Owner"] = s3_bucket.owner
×
1912

1913
                if s3_object.checksum_algorithm:
1✔
1914
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1915
                    object_data["ChecksumType"] = s3_object.checksum_type
1✔
1916

1917
                s3_objects.append(object_data)
1✔
1918

1919
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1920
            count += 1
1✔
1921

1922
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1923

1924
        response = ListObjectsV2Output(
1✔
1925
            IsTruncated=is_truncated,
1926
            Name=bucket,
1927
            MaxKeys=max_keys,
1928
            Prefix=prefix or "",
1929
            KeyCount=count,
1930
        )
1931
        if s3_objects:
1✔
1932
            response["Contents"] = s3_objects
1✔
1933
        if encoding_type:
1✔
1934
            response["EncodingType"] = EncodingType.url
1✔
1935
        if delimiter:
1✔
1936
            response["Delimiter"] = delimiter
1✔
1937
        if common_prefixes:
1✔
1938
            response["CommonPrefixes"] = common_prefixes
1✔
1939
        if next_continuation_token:
1✔
1940
            response["NextContinuationToken"] = next_continuation_token
1✔
1941

1942
        if continuation_token:
1✔
1943
            response["ContinuationToken"] = continuation_token
1✔
1944
        elif start_after:
1✔
1945
            response["StartAfter"] = start_after
1✔
1946

1947
        if s3_bucket.bucket_region != "us-east-1":
1✔
1948
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1949

1950
        # RequestCharged: Optional[RequestCharged]  # TODO
1951
        return response
1✔
1952

1953
    def list_object_versions(
1✔
1954
        self,
1955
        context: RequestContext,
1956
        bucket: BucketName,
1957
        delimiter: Delimiter = None,
1958
        encoding_type: EncodingType = None,
1959
        key_marker: KeyMarker = None,
1960
        max_keys: MaxKeys = None,
1961
        prefix: Prefix = None,
1962
        version_id_marker: VersionIdMarker = None,
1963
        expected_bucket_owner: AccountId = None,
1964
        request_payer: RequestPayer = None,
1965
        optional_object_attributes: OptionalObjectAttributesList = None,
1966
        **kwargs,
1967
    ) -> ListObjectVersionsOutput:
1968
        if version_id_marker and not key_marker:
1✔
1969
            raise InvalidArgument(
1✔
1970
                "A version-id marker cannot be specified without a key marker.",
1971
                ArgumentName="version-id-marker",
1972
                ArgumentValue=version_id_marker,
1973
            )
1974

1975
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1976
        common_prefixes = set()
1✔
1977
        count = 0
1✔
1978
        is_truncated = False
1✔
1979
        next_key_marker = None
1✔
1980
        next_version_id_marker = None
1✔
1981
        max_keys = max_keys or 1000
1✔
1982
        prefix = prefix or ""
1✔
1983
        delimiter = delimiter or ""
1✔
1984
        if encoding_type:
1✔
1985
            prefix = urlparse.quote(prefix)
1✔
1986
            delimiter = urlparse.quote(delimiter)
1✔
1987
        version_key_marker_found = False
1✔
1988

1989
        object_versions: list[ObjectVersion] = []
1✔
1990
        delete_markers: list[DeleteMarkerEntry] = []
1✔
1991

1992
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
1993
        # sort by key, and last-modified-date, to get the last version first
1994
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
1995
        last_version = all_versions[-1] if all_versions else None
1✔
1996

1997
        for version in all_versions:
1✔
1998
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
1999
            # skip all keys that alphabetically come before key_marker
2000
            if key_marker:
1✔
2001
                if key < key_marker:
1✔
2002
                    continue
1✔
2003
                elif key == key_marker:
1✔
2004
                    if not version_id_marker:
1✔
2005
                        continue
1✔
2006
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
2007
                    if version.version_id == version_id_marker:
1✔
2008
                        version_key_marker_found = True
1✔
2009
                        continue
1✔
2010

2011
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
2012
                    # as soon as the next version id is older than the version id marker (meaning this version was
2013
                    # next after the now-deleted version)
2014
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
2015
                        version_key_marker_found = True
1✔
2016

2017
                    elif not version_key_marker_found:
1✔
2018
                        # as long as we have not passed the version_key_marker, skip the versions
2019
                        continue
1✔
2020

2021
            # Filter for keys that start with prefix
2022
            if prefix and not key.startswith(prefix):
1✔
2023
                continue
1✔
2024

2025
            # see ListObjectsV2 for the logic comments (shared logic here)
2026
            prefix_including_delimiter = None
1✔
2027
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2028
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2029
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2030

2031
                if prefix_including_delimiter in common_prefixes or (
1✔
2032
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2033
                ):
2034
                    continue
1✔
2035

2036
            if prefix_including_delimiter:
1✔
2037
                common_prefixes.add(prefix_including_delimiter)
1✔
2038

2039
            elif isinstance(version, S3DeleteMarker):
1✔
2040
                delete_marker = DeleteMarkerEntry(
1✔
2041
                    Key=key,
2042
                    Owner=s3_bucket.owner,
2043
                    VersionId=version.version_id,
2044
                    IsLatest=version.is_current,
2045
                    LastModified=version.last_modified,
2046
                )
2047
                delete_markers.append(delete_marker)
1✔
2048
            else:
2049
                # TODO: add RestoreStatus if present
2050
                object_version = ObjectVersion(
1✔
2051
                    Key=key,
2052
                    ETag=version.quoted_etag,
2053
                    Owner=s3_bucket.owner,  # TODO: verify reality
2054
                    Size=version.size,
2055
                    VersionId=version.version_id or "null",
2056
                    LastModified=version.last_modified,
2057
                    IsLatest=version.is_current,
2058
                    # TODO: verify this, are other class possible?
2059
                    # StorageClass=version.storage_class,
2060
                    StorageClass=ObjectVersionStorageClass.STANDARD,
2061
                )
2062

2063
                if version.checksum_algorithm:
1✔
2064
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
2065
                    object_version["ChecksumType"] = version.checksum_type
1✔
2066

2067
                object_versions.append(object_version)
1✔
2068

2069
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
2070
            count += 1
1✔
2071
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
2072
                is_truncated = True
1✔
2073
                if prefix_including_delimiter:
1✔
2074
                    next_key_marker = prefix_including_delimiter
1✔
2075
                else:
2076
                    next_key_marker = version.key
1✔
2077
                    next_version_id_marker = version.version_id
1✔
2078
                break
1✔
2079

2080
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2081

2082
        response = ListObjectVersionsOutput(
1✔
2083
            IsTruncated=is_truncated,
2084
            Name=bucket,
2085
            MaxKeys=max_keys,
2086
            Prefix=prefix,
2087
            KeyMarker=key_marker or "",
2088
            VersionIdMarker=version_id_marker or "",
2089
        )
2090
        if object_versions:
1✔
2091
            response["Versions"] = object_versions
1✔
2092
        if encoding_type:
1✔
2093
            response["EncodingType"] = EncodingType.url
1✔
2094
        if delete_markers:
1✔
2095
            response["DeleteMarkers"] = delete_markers
1✔
2096
        if delimiter:
1✔
2097
            response["Delimiter"] = delimiter
1✔
2098
        if common_prefixes:
1✔
2099
            response["CommonPrefixes"] = common_prefixes
1✔
2100
        if next_key_marker:
1✔
2101
            response["NextKeyMarker"] = next_key_marker
1✔
2102
        if next_version_id_marker:
1✔
2103
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
2104

2105
        # RequestCharged: Optional[RequestCharged]  # TODO
2106
        return response
1✔
2107

2108
    @handler("GetObjectAttributes", expand=False)
1✔
2109
    def get_object_attributes(
1✔
2110
        self,
2111
        context: RequestContext,
2112
        request: GetObjectAttributesRequest,
2113
    ) -> GetObjectAttributesOutput:
2114
        bucket_name = request["Bucket"]
1✔
2115
        object_key = request["Key"]
1✔
2116
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2117

2118
        s3_object = s3_bucket.get_object(
1✔
2119
            key=object_key,
2120
            version_id=request.get("VersionId"),
2121
            http_method="GET",
2122
        )
2123

2124
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2125
        if s3_object.sse_key_hash:
1✔
2126
            if not sse_c_key_md5:
1✔
UNCOV
2127
                raise InvalidRequest(
×
2128
                    "The object was stored using a form of Server Side Encryption. "
2129
                    "The correct parameters must be provided to retrieve the object."
2130
                )
2131
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
UNCOV
2132
                raise AccessDenied("Access Denied")
×
2133

2134
        validate_sse_c(
1✔
2135
            algorithm=request.get("SSECustomerAlgorithm"),
2136
            encryption_key=request.get("SSECustomerKey"),
2137
            encryption_key_md5=sse_c_key_md5,
2138
        )
2139

2140
        object_attrs = request.get("ObjectAttributes", [])
1✔
2141
        response = GetObjectAttributesOutput()
1✔
2142
        object_checksum_type = s3_object.checksum_type
1✔
2143
        if "ETag" in object_attrs:
1✔
2144
            response["ETag"] = s3_object.etag
1✔
2145
        if "StorageClass" in object_attrs:
1✔
2146
            response["StorageClass"] = s3_object.storage_class
1✔
2147
        if "ObjectSize" in object_attrs:
1✔
2148
            response["ObjectSize"] = s3_object.size
1✔
2149
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
2150
            if s3_object.parts:
1✔
2151
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2152
            else:
2153
                checksum_value = s3_object.checksum_value
1✔
2154
            response["Checksum"] = {
1✔
2155
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2156
                "ChecksumType": object_checksum_type,
2157
            }
2158

2159
        response["LastModified"] = s3_object.last_modified
1✔
2160

2161
        if s3_bucket.versioning_status:
1✔
2162
            response["VersionId"] = s3_object.version_id
1✔
2163

2164
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2165
            if object_checksum_type == ChecksumType.FULL_OBJECT:
1✔
2166
                response["ObjectParts"] = GetObjectAttributesParts(
1✔
2167
                    TotalPartsCount=len(s3_object.parts)
2168
                )
2169
            else:
2170
                # this is basically a simplified `ListParts` call on the object, only returned when the checksum type is
2171
                # COMPOSITE
2172
                count = 0
1✔
2173
                is_truncated = False
1✔
2174
                part_number_marker = request.get("PartNumberMarker") or 0
1✔
2175
                max_parts = request.get("MaxParts") or 1000
1✔
2176

2177
                parts = []
1✔
2178
                all_parts = sorted(
1✔
2179
                    (int(part_number), part) for part_number, part in s3_object.parts.items()
2180
                )
2181
                last_part_number, last_part = all_parts[-1]
1✔
2182

2183
                for part_number, part in all_parts:
1✔
2184
                    if part_number <= part_number_marker:
1✔
2185
                        continue
1✔
2186
                    part_item = select_from_typed_dict(ObjectPart, part)
1✔
2187

2188
                    parts.append(part_item)
1✔
2189
                    count += 1
1✔
2190

2191
                    if count >= max_parts and part["PartNumber"] != last_part_number:
1✔
2192
                        is_truncated = True
1✔
2193
                        break
1✔
2194

2195
                object_parts = GetObjectAttributesParts(
1✔
2196
                    TotalPartsCount=len(s3_object.parts),
2197
                    IsTruncated=is_truncated,
2198
                    MaxParts=max_parts,
2199
                    PartNumberMarker=part_number_marker,
2200
                    NextPartNumberMarker=0,
2201
                )
2202
                if parts:
1✔
2203
                    object_parts["Parts"] = parts
1✔
2204
                    object_parts["NextPartNumberMarker"] = parts[-1]["PartNumber"]
1✔
2205

2206
                response["ObjectParts"] = object_parts
1✔
2207

2208
        return response
1✔
2209

2210
    def restore_object(
1✔
2211
        self,
2212
        context: RequestContext,
2213
        bucket: BucketName,
2214
        key: ObjectKey,
2215
        version_id: ObjectVersionId = None,
2216
        restore_request: RestoreRequest = None,
2217
        request_payer: RequestPayer = None,
2218
        checksum_algorithm: ChecksumAlgorithm = None,
2219
        expected_bucket_owner: AccountId = None,
2220
        **kwargs,
2221
    ) -> RestoreObjectOutput:
2222
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2223

2224
        s3_object = s3_bucket.get_object(
1✔
2225
            key=key,
2226
            version_id=version_id,
2227
            http_method="GET",  # TODO: verify http method
2228
        )
2229
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
UNCOV
2230
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2231

2232
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2233
        # will only implement only the same functionality for now
2234

2235
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2236
        status_code = 200 if s3_object.restore else 202
1✔
2237
        restore_days = restore_request.get("Days")
1✔
2238
        if not restore_days:
1✔
UNCOV
2239
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
UNCOV
2240
            return RestoreObjectOutput()
×
2241

2242
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2243
            datetime.datetime.now(datetime.UTC), restore_days
2244
        )
2245
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2246
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2247

2248
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context(
1✔
2249
            context,
2250
            s3_bucket=s3_bucket,
2251
            s3_object=s3_object,
2252
        )
2253
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2254
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2255
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2256
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2257
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2258
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2259
            "Post", "Completed"
2260
        )
2261
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2262

2263
        # TODO: request charged
2264
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2265

2266
    @handler("CreateMultipartUpload", expand=False)
1✔
2267
    def create_multipart_upload(
1✔
2268
        self,
2269
        context: RequestContext,
2270
        request: CreateMultipartUploadRequest,
2271
    ) -> CreateMultipartUploadOutput:
2272
        # TODO: handle missing parameters:
2273
        #  request_payer: RequestPayer = None,
2274
        bucket_name = request["Bucket"]
1✔
2275
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2276

2277
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2278
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2279
        ):
2280
            raise InvalidStorageClass(
1✔
2281
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2282
            )
2283

2284
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2285
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2286

2287
        if tagging := request.get("Tagging"):
1✔
UNCOV
2288
            tagging = parse_tagging_header(tagging_header=tagging)
×
2289

2290
        key = request["Key"]
1✔
2291

2292
        system_metadata = get_system_metadata_from_request(request)
1✔
2293
        if not system_metadata.get("ContentType"):
1✔
2294
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2295

2296
        user_metadata = decode_user_metadata(request.get("Metadata"))
1✔
2297

2298
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2299
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2300
            raise InvalidRequest(
1✔
2301
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, SHA1, SHA256]"
2302
            )
2303

2304
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2305
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2306
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2307
            else:
2308
                checksum_type = ChecksumType.COMPOSITE
1✔
2309
        elif checksum_type and not checksum_algorithm:
1✔
2310
            raise InvalidRequest(
1✔
2311
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2312
            )
2313

2314
        if (
1✔
2315
            checksum_type == ChecksumType.COMPOSITE
2316
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2317
        ):
2318
            raise InvalidRequest(
1✔
2319
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2320
            )
2321
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2322
            "SHA"
2323
        ):
2324
            raise InvalidRequest(
1✔
2325
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2326
            )
2327

2328
        # TODO: we're not encrypting the object with the provided key for now
2329
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2330
        validate_sse_c(
1✔
2331
            algorithm=request.get("SSECustomerAlgorithm"),
2332
            encryption_key=request.get("SSECustomerKey"),
2333
            encryption_key_md5=sse_c_key_md5,
2334
            server_side_encryption=request.get("ServerSideEncryption"),
2335
        )
2336

2337
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2338
            request,
2339
            s3_bucket,
2340
            store,
2341
        )
2342
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2343

2344
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2345

2346
        # validate encryption values
2347
        s3_multipart = S3Multipart(
1✔
2348
            key=key,
2349
            storage_class=storage_class,
2350
            expires=request.get("Expires"),
2351
            user_metadata=user_metadata,
2352
            system_metadata=system_metadata,
2353
            checksum_algorithm=checksum_algorithm,
2354
            checksum_type=checksum_type,
2355
            encryption=encryption_parameters.encryption,
2356
            kms_key_id=encryption_parameters.kms_key_id,
2357
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2358
            sse_key_hash=sse_c_key_md5,
2359
            lock_mode=lock_parameters.lock_mode,
2360
            lock_legal_status=lock_parameters.lock_legal_status,
2361
            lock_until=lock_parameters.lock_until,
2362
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2363
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2364
            acl=acl,
2365
            initiator=get_owner_for_account_id(context.account_id),
2366
            tagging=tagging,
2367
            owner=s3_bucket.owner,
2368
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2369
        )
2370
        # it seems if there is SSE-C on the multipart, AWS S3 will override the default Checksum behavior (but not on
2371
        # PutObject)
2372
        if sse_c_key_md5:
1✔
2373
            s3_multipart.object.checksum_algorithm = None
1✔
2374

2375
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2376

2377
        response = CreateMultipartUploadOutput(
1✔
2378
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2379
        )
2380

2381
        if checksum_algorithm:
1✔
2382
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2383
            response["ChecksumType"] = checksum_type
1✔
2384

2385
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2386
        if sse_c_key_md5:
1✔
2387
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2388
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2389

2390
        # TODO: missing response fields we're not currently supporting
2391
        # - AbortDate: lifecycle related,not currently supported, todo
2392
        # - AbortRuleId: lifecycle related, not currently supported, todo
2393
        # - RequestCharged: todo
2394

2395
        return response
1✔
2396

2397
    @handler("UploadPart", expand=False)
1✔
2398
    def upload_part(
1✔
2399
        self,
2400
        context: RequestContext,
2401
        request: UploadPartRequest,
2402
    ) -> UploadPartOutput:
2403
        # TODO: missing following parameters:
2404
        #  content_length: ContentLength = None, ->validate?
2405
        #  content_md5: ContentMD5 = None, -> validate?
2406
        #  request_payer: RequestPayer = None,
2407
        bucket_name = request["Bucket"]
1✔
2408
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2409

2410
        upload_id = request.get("UploadId")
1✔
2411
        if not (
1✔
2412
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2413
        ) or s3_multipart.object.key != request.get("Key"):
2414
            raise NoSuchUpload(
1✔
2415
                "The specified upload does not exist. "
2416
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2417
                UploadId=upload_id,
2418
            )
2419
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2420
            raise InvalidArgument(
1✔
2421
                "Part number must be an integer between 1 and 10000, inclusive",
2422
                ArgumentName="partNumber",
2423
                ArgumentValue=part_number,
2424
            )
2425

2426
        if content_md5 := request.get("ContentMD5"):
1✔
2427
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2428
            if not base_64_content_md5_to_etag(content_md5):
1✔
2429
                raise InvalidDigest(
1✔
2430
                    "The Content-MD5 you specified was invalid.",
2431
                    Content_MD5=content_md5,
2432
                )
2433

2434
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2435
        checksum_value = (
1✔
2436
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2437
        )
2438

2439
        # TODO: we're not encrypting the object with the provided key for now
2440
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2441
        validate_sse_c(
1✔
2442
            algorithm=request.get("SSECustomerAlgorithm"),
2443
            encryption_key=request.get("SSECustomerKey"),
2444
            encryption_key_md5=sse_c_key_md5,
2445
        )
2446

2447
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2448
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2449
        ):
2450
            raise InvalidRequest(
1✔
2451
                "The multipart upload initiate requested encryption. "
2452
                "Subsequent part requests must include the appropriate encryption parameters."
2453
            )
2454
        elif (
1✔
2455
            s3_multipart.object.sse_key_hash
2456
            and sse_c_key_md5
2457
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2458
        ):
2459
            raise InvalidRequest(
1✔
2460
                "The provided encryption parameters did not match the ones used originally."
2461
            )
2462

2463
        s3_part = S3Part(
1✔
2464
            part_number=part_number,
2465
            checksum_algorithm=checksum_algorithm,
2466
            checksum_value=checksum_value,
2467
        )
2468
        body = request.get("Body")
1✔
2469
        headers = context.request.headers
1✔
2470
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2471
            "STREAMING-"
2472
        ) or "aws-chunked" in headers.get("content-encoding", "")
2473
        # check if chunked request
2474
        if is_aws_chunked:
1✔
2475
            checksum_algorithm = (
1✔
2476
                checksum_algorithm
2477
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2478
            )
2479
            if checksum_algorithm:
1✔
UNCOV
2480
                s3_part.checksum_algorithm = checksum_algorithm
×
2481

2482
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2483
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2484

2485
        if (
1✔
2486
            s3_multipart.checksum_algorithm
2487
            and s3_part.checksum_algorithm != s3_multipart.checksum_algorithm
2488
        ):
2489
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2490
            error_mp_checksum = (
1✔
2491
                s3_multipart.object.checksum_algorithm.lower()
2492
                if s3_multipart.object.checksum_algorithm
2493
                else "null"
2494
            )
2495
            if not error_mp_checksum == "null":
1✔
2496
                raise InvalidRequest(
1✔
2497
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2498
                )
2499

2500
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2501
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2502
            try:
1✔
2503
                stored_s3_part.write(body)
1✔
2504
            except Exception:
1✔
2505
                stored_multipart.remove_part(s3_part)
1✔
2506
                raise
1✔
2507

2508
            if checksum_algorithm:
1✔
2509
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2510
                    stored_multipart.remove_part(s3_part)
1✔
2511
                    raise InvalidRequest(
1✔
2512
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2513
                    )
2514
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2515
                    stored_multipart.remove_part(s3_part)
1✔
2516
                    raise BadDigest(
1✔
2517
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2518
                    )
2519

2520
            if content_md5:
1✔
2521
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2522
                if calculated_md5 != content_md5:
1✔
2523
                    stored_multipart.remove_part(s3_part)
1✔
2524
                    raise BadDigest(
1✔
2525
                        "The Content-MD5 you specified did not match what we received.",
2526
                        ExpectedDigest=content_md5,
2527
                        CalculatedDigest=calculated_md5,
2528
                    )
2529

2530
            s3_multipart.parts[str(part_number)] = s3_part
1✔
2531

2532
        response = UploadPartOutput(
1✔
2533
            ETag=s3_part.quoted_etag,
2534
        )
2535

2536
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2537
        if sse_c_key_md5:
1✔
2538
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2539
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2540

2541
        if s3_part.checksum_algorithm:
1✔
2542
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2543

2544
        # TODO: RequestCharged: Optional[RequestCharged]
2545
        return response
1✔
2546

2547
    @handler("UploadPartCopy", expand=False)
1✔
2548
    def upload_part_copy(
1✔
2549
        self,
2550
        context: RequestContext,
2551
        request: UploadPartCopyRequest,
2552
    ) -> UploadPartCopyOutput:
2553
        # TODO: handle following parameters:
2554
        #  SSECustomerAlgorithm: Optional[SSECustomerAlgorithm]
2555
        #  SSECustomerKey: Optional[SSECustomerKey]
2556
        #  SSECustomerKeyMD5: Optional[SSECustomerKeyMD5]
2557
        #  CopySourceSSECustomerAlgorithm: Optional[CopySourceSSECustomerAlgorithm]
2558
        #  CopySourceSSECustomerKey: Optional[CopySourceSSECustomerKey]
2559
        #  CopySourceSSECustomerKeyMD5: Optional[CopySourceSSECustomerKeyMD5]
2560
        #  RequestPayer: Optional[RequestPayer]
2561
        #  ExpectedBucketOwner: Optional[AccountId]
2562
        #  ExpectedSourceBucketOwner: Optional[AccountId]
2563
        dest_bucket = request["Bucket"]
1✔
2564
        dest_key = request["Key"]
1✔
2565
        store = self.get_store(context.account_id, context.region)
1✔
2566
        # TODO: validate cross-account UploadPartCopy
2567
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
UNCOV
2568
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2569

2570
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2571
            request.get("CopySource")
2572
        )
2573

2574
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
UNCOV
2575
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2576

2577
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2578
        try:
1✔
2579
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
UNCOV
2580
        except MethodNotAllowed:
×
UNCOV
2581
            raise InvalidRequest(
×
2582
                "The source of a copy request may not specifically refer to a delete marker by version id."
2583
            )
2584

2585
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
UNCOV
2586
            raise InvalidObjectState(
×
2587
                "Operation is not valid for the source object's storage class",
2588
                StorageClass=src_s3_object.storage_class,
2589
            )
2590

2591
        upload_id = request.get("UploadId")
1✔
2592
        if (
1✔
2593
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2594
            or s3_multipart.object.key != dest_key
2595
        ):
UNCOV
2596
            raise NoSuchUpload(
×
2597
                "The specified upload does not exist. "
2598
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2599
                UploadId=upload_id,
2600
            )
2601

2602
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
UNCOV
2603
            raise InvalidArgument(
×
2604
                "Part number must be an integer between 1 and 10000, inclusive",
2605
                ArgumentName="partNumber",
2606
                ArgumentValue=part_number,
2607
            )
2608

2609
        source_range = request.get("CopySourceRange")
1✔
2610
        # TODO implement copy source IF
2611

2612
        range_data: ObjectRange | None = None
1✔
2613
        if source_range:
1✔
2614
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2615

2616
        if precondition := get_failed_upload_part_copy_source_preconditions(
1✔
2617
            request, src_s3_object.last_modified, src_s3_object.etag
2618
        ):
2619
            raise PreconditionFailed(
1✔
2620
                "At least one of the pre-conditions you specified did not hold",
2621
                Condition=precondition,
2622
            )
2623

2624
        s3_part = S3Part(part_number=part_number)
1✔
2625
        if s3_multipart.checksum_algorithm:
1✔
2626
            s3_part.checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2627

2628
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2629
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2630

2631
        s3_multipart.parts[str(part_number)] = s3_part
1✔
2632

2633
        # TODO: return those fields
2634
        #     RequestCharged: Optional[RequestCharged]
2635

2636
        result = CopyPartResult(
1✔
2637
            ETag=s3_part.quoted_etag,
2638
            LastModified=s3_part.last_modified,
2639
        )
2640

2641
        response = UploadPartCopyOutput(
1✔
2642
            CopyPartResult=result,
2643
        )
2644

2645
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
UNCOV
2646
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2647

2648
        if s3_part.checksum_algorithm:
1✔
2649
            result[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2650

2651
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2652

2653
        return response
1✔
2654

2655
    def complete_multipart_upload(
1✔
2656
        self,
2657
        context: RequestContext,
2658
        bucket: BucketName,
2659
        key: ObjectKey,
2660
        upload_id: MultipartUploadId,
2661
        multipart_upload: CompletedMultipartUpload = None,
2662
        checksum_crc32: ChecksumCRC32 = None,
2663
        checksum_crc32_c: ChecksumCRC32C = None,
2664
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2665
        checksum_sha1: ChecksumSHA1 = None,
2666
        checksum_sha256: ChecksumSHA256 = None,
2667
        checksum_type: ChecksumType = None,
2668
        mpu_object_size: MpuObjectSize = None,
2669
        request_payer: RequestPayer = None,
2670
        expected_bucket_owner: AccountId = None,
2671
        if_match: IfMatch = None,
2672
        if_none_match: IfNoneMatch = None,
2673
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2674
        sse_customer_key: SSECustomerKey = None,
2675
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2676
        **kwargs,
2677
    ) -> CompleteMultipartUploadOutput:
2678
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2679

2680
        if (
1✔
2681
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2682
            or s3_multipart.object.key != key
2683
        ):
2684
            raise NoSuchUpload(
1✔
2685
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2686
                UploadId=upload_id,
2687
            )
2688

2689
        if if_none_match and if_match:
1✔
2690
            raise NotImplementedException(
2691
                "A header you provided implies functionality that is not implemented",
2692
                Header="If-Match,If-None-Match",
2693
                additionalMessage="Multiple conditional request headers present in the request",
2694
            )
2695

2696
        elif if_none_match:
1✔
2697
            # TODO: improve concurrency mechanism for `if_none_match` and `if_match`
2698
            if if_none_match != "*":
1✔
2699
                raise NotImplementedException(
2700
                    "A header you provided implies functionality that is not implemented",
2701
                    Header="If-None-Match",
2702
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2703
                )
2704
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2705
                raise PreconditionFailed(
1✔
2706
                    "At least one of the pre-conditions you specified did not hold",
2707
                    Condition="If-None-Match",
2708
                )
2709
            elif s3_multipart.precondition:
1✔
2710
                raise ConditionalRequestConflict(
1✔
2711
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2712
                    Condition="If-None-Match",
2713
                    Key=key,
2714
                )
2715

2716
        elif if_match:
1✔
2717
            if if_match == "*":
1✔
2718
                raise NotImplementedException(
2719
                    "A header you provided implies functionality that is not implemented",
2720
                    Header="If-None-Match",
2721
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2722
                )
2723
            verify_object_equality_precondition_write(
1✔
2724
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2725
            )
2726

2727
        parts = multipart_upload.get("Parts", [])
1✔
2728
        if not parts:
1✔
2729
            raise InvalidRequest("You must specify at least one part")
1✔
2730

2731
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2732
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2733
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2734
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2735
        if sorted(parts_numbers) != parts_numbers:
1✔
2736
            raise InvalidPartOrder(
1✔
2737
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2738
                UploadId=upload_id,
2739
            )
2740

2741
        mpu_checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2742
        mpu_checksum_type = s3_multipart.checksum_type
1✔
2743

2744
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2745
            raise InvalidRequest(
1✔
2746
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2747
                f"The complete request must use the same checksum mode."
2748
            )
2749

2750
        # generate the versionId before completing, in case the bucket versioning status has changed between
2751
        # creation and completion? AWS validate this
2752
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2753
        s3_multipart.object.version_id = version_id
1✔
2754

2755
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2756
        # persistence. if we do not have a new version, do not validate those parameters
2757
        # TODO: remove for next major version (minor?)
2758
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2759
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2760
            checksum_map = {
1✔
2761
                "crc32": checksum_crc32,
2762
                "crc32c": checksum_crc32_c,
2763
                "crc64nvme": checksum_crc64_nvme,
2764
                "sha1": checksum_sha1,
2765
                "sha256": checksum_sha256,
2766
            }
2767
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2768
            s3_multipart.complete_multipart(
1✔
2769
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2770
            )
2771
            if mpu_checksum_algorithm and (
1✔
2772
                (
2773
                    checksum_value
2774
                    and mpu_checksum_type == ChecksumType.FULL_OBJECT
2775
                    and not checksum_type
2776
                )
2777
                or any(
2778
                    checksum_value
2779
                    for checksum_type, checksum_value in checksum_map.items()
2780
                    if checksum_type != checksum_algorithm
2781
                )
2782
            ):
2783
                # this is not ideal, but this validation comes last... after the validation of individual parts
2784
                s3_multipart.object.parts.clear()
1✔
2785
                raise BadDigest(
1✔
2786
                    f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2787
                )
2788
        else:
UNCOV
2789
            s3_multipart.complete_multipart(parts)
×
2790

2791
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2792
        stored_multipart.complete_multipart(
1✔
2793
            [s3_multipart.parts.get(str(part_number)) for part_number in parts_numbers]
2794
        )
2795
        if not s3_multipart.checksum_algorithm and s3_multipart.object.checksum_algorithm:
1✔
2796
            with self._storage_backend.open(
1✔
2797
                bucket, s3_multipart.object, mode="r"
2798
            ) as s3_stored_object:
2799
                s3_multipart.object.checksum_value = s3_stored_object.checksum
1✔
2800
                s3_multipart.object.checksum_type = ChecksumType.FULL_OBJECT
1✔
2801

2802
        s3_object = s3_multipart.object
1✔
2803

2804
        s3_bucket.objects.set(key, s3_object)
1✔
2805

2806
        # remove the multipart now that it's complete
2807
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2808
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2809

2810
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2811
        store.TAGS.tags.pop(key_id, None)
1✔
2812
        if s3_multipart.tagging:
1✔
UNCOV
2813
            store.TAGS.tags[key_id] = s3_multipart.tagging
×
2814

2815
        # RequestCharged: Optional[RequestCharged] TODO
2816

2817
        response = CompleteMultipartUploadOutput(
1✔
2818
            Bucket=bucket,
2819
            Key=key,
2820
            ETag=s3_object.quoted_etag,
2821
            Location=get_url_encoded_object_location(bucket, key),
2822
        )
2823

2824
        if s3_object.version_id:
1✔
UNCOV
2825
            response["VersionId"] = s3_object.version_id
×
2826

2827
        # it seems AWS is not returning checksum related fields if the object has KMS encryption ¯\_(ツ)_/¯
2828
        # but it still generates them, and they can be retrieved with regular GetObject and such operations
2829
        if s3_object.checksum_algorithm and not s3_object.kms_key_id:
1✔
2830
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2831
            response["ChecksumType"] = s3_object.checksum_type
1✔
2832

2833
        if s3_object.expiration:
1✔
UNCOV
2834
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2835

2836
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2837

2838
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2839

2840
        return response
1✔
2841

2842
    def abort_multipart_upload(
1✔
2843
        self,
2844
        context: RequestContext,
2845
        bucket: BucketName,
2846
        key: ObjectKey,
2847
        upload_id: MultipartUploadId,
2848
        request_payer: RequestPayer = None,
2849
        expected_bucket_owner: AccountId = None,
2850
        if_match_initiated_time: IfMatchInitiatedTime = None,
2851
        **kwargs,
2852
    ) -> AbortMultipartUploadOutput:
2853
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2854

2855
        if (
1✔
2856
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2857
            or s3_multipart.object.key != key
2858
        ):
2859
            raise NoSuchUpload(
1✔
2860
                "The specified upload does not exist. "
2861
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2862
                UploadId=upload_id,
2863
            )
2864
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2865

2866
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2867
        response = AbortMultipartUploadOutput()
1✔
2868
        # TODO: requestCharged
2869
        return response
1✔
2870

2871
    def list_parts(
1✔
2872
        self,
2873
        context: RequestContext,
2874
        bucket: BucketName,
2875
        key: ObjectKey,
2876
        upload_id: MultipartUploadId,
2877
        max_parts: MaxParts = None,
2878
        part_number_marker: PartNumberMarker = None,
2879
        request_payer: RequestPayer = None,
2880
        expected_bucket_owner: AccountId = None,
2881
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2882
        sse_customer_key: SSECustomerKey = None,
2883
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2884
        **kwargs,
2885
    ) -> ListPartsOutput:
2886
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2887

2888
        if (
1✔
2889
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2890
            or s3_multipart.object.key != key
2891
        ):
2892
            raise NoSuchUpload(
1✔
2893
                "The specified upload does not exist. "
2894
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2895
                UploadId=upload_id,
2896
            )
2897

2898
        count = 0
1✔
2899
        is_truncated = False
1✔
2900
        part_number_marker = part_number_marker or 0
1✔
2901
        max_parts = max_parts or 1000
1✔
2902

2903
        parts = []
1✔
2904
        all_parts = sorted(
1✔
2905
            (int(part_number), part) for part_number, part in s3_multipart.parts.items()
2906
        )
2907
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2908
        for part_number, part in all_parts:
1✔
2909
            if part_number <= part_number_marker:
1✔
2910
                continue
1✔
2911
            part_item = Part(
1✔
2912
                ETag=part.quoted_etag,
2913
                LastModified=part.last_modified,
2914
                PartNumber=part_number,
2915
                Size=part.size,
2916
            )
2917
            if s3_multipart.checksum_algorithm and part.checksum_algorithm:
1✔
2918
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2919

2920
            parts.append(part_item)
1✔
2921
            count += 1
1✔
2922

2923
            if count >= max_parts and part.part_number != last_part_number:
1✔
2924
                is_truncated = True
1✔
2925
                break
1✔
2926

2927
        response = ListPartsOutput(
1✔
2928
            Bucket=bucket,
2929
            Key=key,
2930
            UploadId=upload_id,
2931
            Initiator=s3_multipart.initiator,
2932
            Owner=s3_multipart.initiator,
2933
            StorageClass=s3_multipart.object.storage_class,
2934
            IsTruncated=is_truncated,
2935
            MaxParts=max_parts,
2936
            PartNumberMarker=0,
2937
            NextPartNumberMarker=0,
2938
        )
2939
        if parts:
1✔
2940
            response["Parts"] = parts
1✔
2941
            last_part = parts[-1]["PartNumber"]
1✔
2942
            response["NextPartNumberMarker"] = last_part
1✔
2943

2944
        if part_number_marker:
1✔
2945
            response["PartNumberMarker"] = part_number_marker
1✔
2946
        if s3_multipart.checksum_algorithm:
1✔
2947
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
2948
            response["ChecksumType"] = s3_multipart.checksum_type
1✔
2949

2950
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
2951
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
2952
        #     RequestCharged: Optional[RequestCharged]
2953

2954
        return response
1✔
2955

2956
    def list_multipart_uploads(
1✔
2957
        self,
2958
        context: RequestContext,
2959
        bucket: BucketName,
2960
        delimiter: Delimiter = None,
2961
        encoding_type: EncodingType = None,
2962
        key_marker: KeyMarker = None,
2963
        max_uploads: MaxUploads = None,
2964
        prefix: Prefix = None,
2965
        upload_id_marker: UploadIdMarker = None,
2966
        expected_bucket_owner: AccountId = None,
2967
        request_payer: RequestPayer = None,
2968
        **kwargs,
2969
    ) -> ListMultipartUploadsOutput:
2970
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2971

2972
        common_prefixes = set()
1✔
2973
        count = 0
1✔
2974
        is_truncated = False
1✔
2975
        max_uploads = max_uploads or 1000
1✔
2976
        prefix = prefix or ""
1✔
2977
        delimiter = delimiter or ""
1✔
2978
        if encoding_type:
1✔
2979
            prefix = urlparse.quote(prefix)
1✔
2980
            delimiter = urlparse.quote(delimiter)
1✔
2981
        upload_id_marker_found = False
1✔
2982

2983
        if key_marker and upload_id_marker:
1✔
2984
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
2985
            if multipart:
1✔
2986
                key = (
1✔
2987
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
2988
                )
2989
            else:
2990
                # set key to None so it fails if the multipart is not Found
UNCOV
2991
                key = None
×
2992

2993
            if key_marker != key:
1✔
2994
                raise InvalidArgument(
1✔
2995
                    "Invalid uploadId marker",
2996
                    ArgumentName="upload-id-marker",
2997
                    ArgumentValue=upload_id_marker,
2998
                )
2999

3000
        uploads = []
1✔
3001
        # sort by key and initiated
3002
        all_multiparts = sorted(
1✔
3003
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
3004
        )
3005
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
3006

3007
        for multipart in all_multiparts:
1✔
3008
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
3009
            # skip all keys that are different than key_marker
3010
            if key_marker:
1✔
3011
                if key < key_marker:
1✔
3012
                    continue
1✔
3013
                elif key == key_marker:
1✔
3014
                    if not upload_id_marker:
1✔
3015
                        continue
1✔
3016
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
3017
                    if multipart.id == upload_id_marker:
1✔
3018
                        upload_id_marker_found = True
1✔
3019
                        continue
1✔
3020
                    elif not upload_id_marker_found:
1✔
3021
                        # as long as we have not passed the version_key_marker, skip the versions
3022
                        continue
1✔
3023

3024
            # Filter for keys that start with prefix
3025
            if prefix and not key.startswith(prefix):
1✔
3026
                continue
1✔
3027

3028
            # see ListObjectsV2 for the logic comments (shared logic here)
3029
            prefix_including_delimiter = None
1✔
3030
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
3031
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
3032
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
3033

3034
                if prefix_including_delimiter in common_prefixes or (
1✔
3035
                    key_marker and key_marker.startswith(prefix_including_delimiter)
3036
                ):
3037
                    continue
1✔
3038

3039
            if prefix_including_delimiter:
1✔
3040
                common_prefixes.add(prefix_including_delimiter)
1✔
3041
            else:
3042
                multipart_upload = MultipartUpload(
1✔
3043
                    UploadId=multipart.id,
3044
                    Key=multipart.object.key,
3045
                    Initiated=multipart.initiated,
3046
                    StorageClass=multipart.object.storage_class,
3047
                    Owner=multipart.initiator,  # TODO: check the difference
3048
                    Initiator=multipart.initiator,
3049
                )
3050
                if multipart.checksum_algorithm:
1✔
3051
                    multipart_upload["ChecksumAlgorithm"] = multipart.checksum_algorithm
1✔
3052
                    multipart_upload["ChecksumType"] = multipart.checksum_type
1✔
3053

3054
                uploads.append(multipart_upload)
1✔
3055

3056
            count += 1
1✔
3057
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
3058
                is_truncated = True
1✔
3059
                break
1✔
3060

3061
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
3062

3063
        response = ListMultipartUploadsOutput(
1✔
3064
            Bucket=bucket,
3065
            IsTruncated=is_truncated,
3066
            MaxUploads=max_uploads or 1000,
3067
            KeyMarker=key_marker or "",
3068
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
3069
            NextKeyMarker="",
3070
            NextUploadIdMarker="",
3071
        )
3072
        if uploads:
1✔
3073
            response["Uploads"] = uploads
1✔
3074
            last_upload = uploads[-1]
1✔
3075
            response["NextKeyMarker"] = last_upload["Key"]
1✔
3076
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
3077
        if delimiter:
1✔
3078
            response["Delimiter"] = delimiter
1✔
3079
        if prefix:
1✔
3080
            response["Prefix"] = prefix
1✔
3081
        if encoding_type:
1✔
3082
            response["EncodingType"] = EncodingType.url
1✔
3083
        if common_prefixes:
1✔
3084
            response["CommonPrefixes"] = common_prefixes
1✔
3085

3086
        return response
1✔
3087

3088
    def put_bucket_versioning(
1✔
3089
        self,
3090
        context: RequestContext,
3091
        bucket: BucketName,
3092
        versioning_configuration: VersioningConfiguration,
3093
        content_md5: ContentMD5 = None,
3094
        checksum_algorithm: ChecksumAlgorithm = None,
3095
        mfa: MFA = None,
3096
        expected_bucket_owner: AccountId = None,
3097
        **kwargs,
3098
    ) -> None:
3099
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3100
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
3101
            raise CommonServiceException(
1✔
3102
                code="IllegalVersioningConfigurationException",
3103
                message="The Versioning element must be specified",
3104
            )
3105

3106
        if versioning_status not in ("Enabled", "Suspended"):
1✔
3107
            raise MalformedXML()
1✔
3108

3109
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
3110
            raise InvalidBucketState(
1✔
3111
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
3112
            )
3113

3114
        if not s3_bucket.versioning_status:
1✔
3115
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
3116

3117
        s3_bucket.versioning_status = versioning_status
1✔
3118

3119
    def get_bucket_versioning(
1✔
3120
        self,
3121
        context: RequestContext,
3122
        bucket: BucketName,
3123
        expected_bucket_owner: AccountId = None,
3124
        **kwargs,
3125
    ) -> GetBucketVersioningOutput:
3126
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3127

3128
        if not s3_bucket.versioning_status:
1✔
3129
            return GetBucketVersioningOutput()
1✔
3130

3131
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
3132

3133
    def get_bucket_encryption(
1✔
3134
        self,
3135
        context: RequestContext,
3136
        bucket: BucketName,
3137
        expected_bucket_owner: AccountId = None,
3138
        **kwargs,
3139
    ) -> GetBucketEncryptionOutput:
3140
        # AWS now encrypts bucket by default with AES256, see:
3141
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
3142
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3143

3144
        if not s3_bucket.encryption_rule:
1✔
UNCOV
3145
            return GetBucketEncryptionOutput()
×
3146

3147
        return GetBucketEncryptionOutput(
1✔
3148
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
3149
        )
3150

3151
    def put_bucket_encryption(
1✔
3152
        self,
3153
        context: RequestContext,
3154
        bucket: BucketName,
3155
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
3156
        content_md5: ContentMD5 = None,
3157
        checksum_algorithm: ChecksumAlgorithm = None,
3158
        expected_bucket_owner: AccountId = None,
3159
        **kwargs,
3160
    ) -> None:
3161
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3162

3163
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
3164
            raise MalformedXML()
1✔
3165

3166
        if len(rules) != 1 or not (
1✔
3167
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
3168
        ):
3169
            raise MalformedXML()
1✔
3170

3171
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
UNCOV
3172
            raise MalformedXML()
×
3173

3174
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
UNCOV
3175
            raise MalformedXML()
×
3176

3177
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
3178
            raise InvalidArgument(
1✔
3179
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
3180
                ArgumentName="ApplyServerSideEncryptionByDefault",
3181
            )
3182
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
3183
        # TODO: validate KMS key? not currently done in moto
3184
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
3185
        # It's always saved as the ARN in the bucket configuration.
3186
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
3187
        # encryption["KMSMasterKeyID"] = master_kms_key
3188

3189
        s3_bucket.encryption_rule = rules[0]
1✔
3190

3191
    def delete_bucket_encryption(
1✔
3192
        self,
3193
        context: RequestContext,
3194
        bucket: BucketName,
3195
        expected_bucket_owner: AccountId = None,
3196
        **kwargs,
3197
    ) -> None:
3198
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3199

3200
        s3_bucket.encryption_rule = None
1✔
3201

3202
    def put_bucket_notification_configuration(
1✔
3203
        self,
3204
        context: RequestContext,
3205
        bucket: BucketName,
3206
        notification_configuration: NotificationConfiguration,
3207
        expected_bucket_owner: AccountId = None,
3208
        skip_destination_validation: SkipValidation = None,
3209
        **kwargs,
3210
    ) -> None:
3211
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3212

3213
        self._verify_notification_configuration(
1✔
3214
            notification_configuration, skip_destination_validation, context, bucket
3215
        )
3216
        s3_bucket.notification_configuration = notification_configuration
1✔
3217

3218
    def get_bucket_notification_configuration(
1✔
3219
        self,
3220
        context: RequestContext,
3221
        bucket: BucketName,
3222
        expected_bucket_owner: AccountId = None,
3223
        **kwargs,
3224
    ) -> NotificationConfiguration:
3225
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3226

3227
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3228

3229
    def put_bucket_tagging(
1✔
3230
        self,
3231
        context: RequestContext,
3232
        bucket: BucketName,
3233
        tagging: Tagging,
3234
        content_md5: ContentMD5 = None,
3235
        checksum_algorithm: ChecksumAlgorithm = None,
3236
        expected_bucket_owner: AccountId = None,
3237
        **kwargs,
3238
    ) -> None:
3239
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3240

3241
        if "TagSet" not in tagging:
1✔
UNCOV
3242
            raise MalformedXML()
×
3243

3244
        tag_set = tagging["TagSet"] or []
1✔
3245
        validate_tag_set(tag_set, type_set="bucket")
1✔
3246

3247
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3248
        self._remove_all_bucket_tags(
1✔
3249
            s3_bucket.bucket_arn, context.account_id, s3_bucket.bucket_region
3250
        )
3251
        self._create_bucket_tags(
1✔
3252
            s3_bucket.bucket_arn, context.account_id, s3_bucket.bucket_region, tag_set
3253
        )
3254

3255
    def get_bucket_tagging(
1✔
3256
        self,
3257
        context: RequestContext,
3258
        bucket: BucketName,
3259
        expected_bucket_owner: AccountId = None,
3260
        **kwargs,
3261
    ) -> GetBucketTaggingOutput:
3262
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3263
        tag_set = self._list_bucket_tags(
1✔
3264
            s3_bucket.bucket_arn, context.account_id, s3_bucket.bucket_region
3265
        )
3266
        if not tag_set:
1✔
3267
            raise NoSuchTagSet(
1✔
3268
                "The TagSet does not exist",
3269
                BucketName=bucket,
3270
            )
3271

3272
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3273

3274
    def delete_bucket_tagging(
1✔
3275
        self,
3276
        context: RequestContext,
3277
        bucket: BucketName,
3278
        expected_bucket_owner: AccountId = None,
3279
        **kwargs,
3280
    ) -> None:
3281
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3282

3283
        # This operation doesn't remove the tags from the store like deleting a resource does, it just sets them as empty.
3284
        self._remove_all_bucket_tags(
1✔
3285
            s3_bucket.bucket_arn, context.account_id, s3_bucket.bucket_region
3286
        )
3287
        self._create_bucket_tags(
1✔
3288
            s3_bucket.bucket_arn, context.account_id, s3_bucket.bucket_region, []
3289
        )
3290

3291
    def put_object_tagging(
1✔
3292
        self,
3293
        context: RequestContext,
3294
        bucket: BucketName,
3295
        key: ObjectKey,
3296
        tagging: Tagging,
3297
        version_id: ObjectVersionId = None,
3298
        content_md5: ContentMD5 = None,
3299
        checksum_algorithm: ChecksumAlgorithm = None,
3300
        expected_bucket_owner: AccountId = None,
3301
        request_payer: RequestPayer = None,
3302
        **kwargs,
3303
    ) -> PutObjectTaggingOutput:
3304
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3305

3306
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3307

3308
        if "TagSet" not in tagging:
1✔
UNCOV
3309
            raise MalformedXML()
×
3310

3311
        tag_set = tagging["TagSet"] or []
1✔
3312
        validate_tag_set(tag_set, type_set="object")
1✔
3313

3314
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3315
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3316
        store.TAGS.tags.pop(key_id, None)
1✔
3317
        store.TAGS.tag_resource(key_id, tags=tag_set)
1✔
3318
        response = PutObjectTaggingOutput()
1✔
3319
        if s3_object.version_id:
1✔
3320
            response["VersionId"] = s3_object.version_id
1✔
3321

3322
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3323

3324
        return response
1✔
3325

3326
    def get_object_tagging(
1✔
3327
        self,
3328
        context: RequestContext,
3329
        bucket: BucketName,
3330
        key: ObjectKey,
3331
        version_id: ObjectVersionId = None,
3332
        expected_bucket_owner: AccountId = None,
3333
        request_payer: RequestPayer = None,
3334
        **kwargs,
3335
    ) -> GetObjectTaggingOutput:
3336
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3337

3338
        try:
1✔
3339
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3340
        except NoSuchKey as e:
1✔
3341
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3342
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3343
            # `S3Bucket.get_object` signature for one operation.
3344
            if s3_bucket.versioning_status and (
1✔
3345
                s3_object_version := s3_bucket.objects.get(key, version_id)
3346
            ):
3347
                raise MethodNotAllowed(
1✔
3348
                    "The specified method is not allowed against this resource.",
3349
                    Method="GET",
3350
                    ResourceType="DeleteMarker",
3351
                    DeleteMarker=True,
3352
                    Allow="DELETE",
3353
                    VersionId=s3_object_version.version_id,
3354
                )
3355

3356
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3357
            # follow AWS on this one
3358
            e.Key = f"{bucket}/{key}"
1✔
3359
            raise e
1✔
3360

3361
        tag_set = store.TAGS.list_tags_for_resource(
1✔
3362
            get_unique_key_id(bucket, key, s3_object.version_id)
3363
        )["Tags"]
3364
        response = GetObjectTaggingOutput(TagSet=tag_set)
1✔
3365
        if s3_object.version_id:
1✔
3366
            response["VersionId"] = s3_object.version_id
1✔
3367

3368
        return response
1✔
3369

3370
    def delete_object_tagging(
1✔
3371
        self,
3372
        context: RequestContext,
3373
        bucket: BucketName,
3374
        key: ObjectKey,
3375
        version_id: ObjectVersionId = None,
3376
        expected_bucket_owner: AccountId = None,
3377
        **kwargs,
3378
    ) -> DeleteObjectTaggingOutput:
3379
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3380

3381
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3382

3383
        store.TAGS.tags.pop(get_unique_key_id(bucket, key, s3_object.version_id), None)
1✔
3384
        response = DeleteObjectTaggingOutput()
1✔
3385
        if s3_object.version_id:
1✔
3386
            response["VersionId"] = s3_object.version_id
1✔
3387

3388
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3389

3390
        return response
1✔
3391

3392
    def put_bucket_cors(
1✔
3393
        self,
3394
        context: RequestContext,
3395
        bucket: BucketName,
3396
        cors_configuration: CORSConfiguration,
3397
        content_md5: ContentMD5 = None,
3398
        checksum_algorithm: ChecksumAlgorithm = None,
3399
        expected_bucket_owner: AccountId = None,
3400
        **kwargs,
3401
    ) -> None:
3402
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3403
        validate_cors_configuration(cors_configuration)
1✔
3404
        s3_bucket.cors_rules = cors_configuration
1✔
3405
        self._cors_handler.invalidate_cache()
1✔
3406

3407
    def get_bucket_cors(
1✔
3408
        self,
3409
        context: RequestContext,
3410
        bucket: BucketName,
3411
        expected_bucket_owner: AccountId = None,
3412
        **kwargs,
3413
    ) -> GetBucketCorsOutput:
3414
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3415

3416
        if not s3_bucket.cors_rules:
1✔
3417
            raise NoSuchCORSConfiguration(
1✔
3418
                "The CORS configuration does not exist",
3419
                BucketName=bucket,
3420
            )
3421
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3422

3423
    def delete_bucket_cors(
1✔
3424
        self,
3425
        context: RequestContext,
3426
        bucket: BucketName,
3427
        expected_bucket_owner: AccountId = None,
3428
        **kwargs,
3429
    ) -> None:
3430
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3431

3432
        if s3_bucket.cors_rules:
1✔
3433
            self._cors_handler.invalidate_cache()
1✔
3434
            s3_bucket.cors_rules = None
1✔
3435

3436
    def get_bucket_lifecycle_configuration(
1✔
3437
        self,
3438
        context: RequestContext,
3439
        bucket: BucketName,
3440
        expected_bucket_owner: AccountId = None,
3441
        **kwargs,
3442
    ) -> GetBucketLifecycleConfigurationOutput:
3443
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3444

3445
        if not s3_bucket.lifecycle_rules:
1✔
3446
            raise NoSuchLifecycleConfiguration(
1✔
3447
                "The lifecycle configuration does not exist",
3448
                BucketName=bucket,
3449
            )
3450

3451
        return GetBucketLifecycleConfigurationOutput(
1✔
3452
            Rules=s3_bucket.lifecycle_rules,
3453
            TransitionDefaultMinimumObjectSize=s3_bucket.transition_default_minimum_object_size,
3454
        )
3455

3456
    def put_bucket_lifecycle_configuration(
1✔
3457
        self,
3458
        context: RequestContext,
3459
        bucket: BucketName,
3460
        checksum_algorithm: ChecksumAlgorithm = None,
3461
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3462
        expected_bucket_owner: AccountId = None,
3463
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3464
        **kwargs,
3465
    ) -> PutBucketLifecycleConfigurationOutput:
3466
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3467

3468
        transition_min_obj_size = (
1✔
3469
            transition_default_minimum_object_size
3470
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3471
        )
3472

3473
        if transition_min_obj_size not in (
1✔
3474
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3475
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3476
        ):
3477
            raise InvalidRequest(
1✔
3478
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3479
            )
3480

3481
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3482
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3483
        #  everytime we get/head an object
3484
        # for now, we keep a cache and get it everytime we fetch an object
3485
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3486
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3487
        self._expiration_cache[bucket].clear()
1✔
3488
        return PutBucketLifecycleConfigurationOutput(
1✔
3489
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3490
        )
3491

3492
    def delete_bucket_lifecycle(
1✔
3493
        self,
3494
        context: RequestContext,
3495
        bucket: BucketName,
3496
        expected_bucket_owner: AccountId = None,
3497
        **kwargs,
3498
    ) -> None:
3499
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3500

3501
        s3_bucket.lifecycle_rules = None
1✔
3502
        self._expiration_cache[bucket].clear()
1✔
3503

3504
    def put_bucket_analytics_configuration(
1✔
3505
        self,
3506
        context: RequestContext,
3507
        bucket: BucketName,
3508
        id: AnalyticsId,
3509
        analytics_configuration: AnalyticsConfiguration,
3510
        expected_bucket_owner: AccountId = None,
3511
        **kwargs,
3512
    ) -> None:
3513
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3514

3515
        validate_bucket_analytics_configuration(
1✔
3516
            id=id, analytics_configuration=analytics_configuration
3517
        )
3518

3519
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3520

3521
    def get_bucket_analytics_configuration(
1✔
3522
        self,
3523
        context: RequestContext,
3524
        bucket: BucketName,
3525
        id: AnalyticsId,
3526
        expected_bucket_owner: AccountId = None,
3527
        **kwargs,
3528
    ) -> GetBucketAnalyticsConfigurationOutput:
3529
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3530

3531
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3532
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3533

3534
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3535

3536
    def list_bucket_analytics_configurations(
1✔
3537
        self,
3538
        context: RequestContext,
3539
        bucket: BucketName,
3540
        continuation_token: Token = None,
3541
        expected_bucket_owner: AccountId = None,
3542
        **kwargs,
3543
    ) -> ListBucketAnalyticsConfigurationsOutput:
3544
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3545

3546
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3547
            IsTruncated=False,
3548
            AnalyticsConfigurationList=sorted(
3549
                s3_bucket.analytics_configurations.values(),
3550
                key=itemgetter("Id"),
3551
            ),
3552
        )
3553

3554
    def delete_bucket_analytics_configuration(
1✔
3555
        self,
3556
        context: RequestContext,
3557
        bucket: BucketName,
3558
        id: AnalyticsId,
3559
        expected_bucket_owner: AccountId = None,
3560
        **kwargs,
3561
    ) -> None:
3562
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3563

3564
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3565
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3566

3567
    def put_bucket_intelligent_tiering_configuration(
1✔
3568
        self,
3569
        context: RequestContext,
3570
        bucket: BucketName,
3571
        id: IntelligentTieringId,
3572
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3573
        expected_bucket_owner: AccountId | None = None,
3574
        **kwargs,
3575
    ) -> None:
3576
        # TODO add support for expected_bucket_owner
3577
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3578

3579
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3580

3581
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3582

3583
    def get_bucket_intelligent_tiering_configuration(
1✔
3584
        self,
3585
        context: RequestContext,
3586
        bucket: BucketName,
3587
        id: IntelligentTieringId,
3588
        expected_bucket_owner: AccountId | None = None,
3589
        **kwargs,
3590
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3591
        # TODO add support for expected_bucket_owner
3592
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3593

3594
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
UNCOV
3595
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3596

3597
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3598
            IntelligentTieringConfiguration=itier_config
3599
        )
3600

3601
    def delete_bucket_intelligent_tiering_configuration(
1✔
3602
        self,
3603
        context: RequestContext,
3604
        bucket: BucketName,
3605
        id: IntelligentTieringId,
3606
        expected_bucket_owner: AccountId | None = None,
3607
        **kwargs,
3608
    ) -> None:
3609
        # TODO add support for expected_bucket_owner
3610
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3611

3612
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3613
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3614

3615
    def list_bucket_intelligent_tiering_configurations(
1✔
3616
        self,
3617
        context: RequestContext,
3618
        bucket: BucketName,
3619
        continuation_token: Token | None = None,
3620
        expected_bucket_owner: AccountId | None = None,
3621
        **kwargs,
3622
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3623
        # TODO add support for expected_bucket_owner
3624
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3625

3626
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3627
            IsTruncated=False,
3628
            IntelligentTieringConfigurationList=sorted(
3629
                s3_bucket.intelligent_tiering_configurations.values(),
3630
                key=itemgetter("Id"),
3631
            ),
3632
        )
3633

3634
    def put_bucket_inventory_configuration(
1✔
3635
        self,
3636
        context: RequestContext,
3637
        bucket: BucketName,
3638
        id: InventoryId,
3639
        inventory_configuration: InventoryConfiguration,
3640
        expected_bucket_owner: AccountId = None,
3641
        **kwargs,
3642
    ) -> None:
3643
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3644

3645
        validate_inventory_configuration(
1✔
3646
            config_id=id, inventory_configuration=inventory_configuration
3647
        )
3648
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3649

3650
    def get_bucket_inventory_configuration(
1✔
3651
        self,
3652
        context: RequestContext,
3653
        bucket: BucketName,
3654
        id: InventoryId,
3655
        expected_bucket_owner: AccountId = None,
3656
        **kwargs,
3657
    ) -> GetBucketInventoryConfigurationOutput:
3658
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3659

3660
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3661
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3662
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3663

3664
    def list_bucket_inventory_configurations(
1✔
3665
        self,
3666
        context: RequestContext,
3667
        bucket: BucketName,
3668
        continuation_token: Token = None,
3669
        expected_bucket_owner: AccountId = None,
3670
        **kwargs,
3671
    ) -> ListBucketInventoryConfigurationsOutput:
3672
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3673

3674
        return ListBucketInventoryConfigurationsOutput(
1✔
3675
            IsTruncated=False,
3676
            InventoryConfigurationList=sorted(
3677
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3678
            ),
3679
        )
3680

3681
    def delete_bucket_inventory_configuration(
1✔
3682
        self,
3683
        context: RequestContext,
3684
        bucket: BucketName,
3685
        id: InventoryId,
3686
        expected_bucket_owner: AccountId = None,
3687
        **kwargs,
3688
    ) -> None:
3689
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3690

3691
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
UNCOV
3692
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3693

3694
    def get_bucket_website(
1✔
3695
        self,
3696
        context: RequestContext,
3697
        bucket: BucketName,
3698
        expected_bucket_owner: AccountId = None,
3699
        **kwargs,
3700
    ) -> GetBucketWebsiteOutput:
3701
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3702

3703
        if not s3_bucket.website_configuration:
1✔
3704
            raise NoSuchWebsiteConfiguration(
1✔
3705
                "The specified bucket does not have a website configuration",
3706
                BucketName=bucket,
3707
            )
3708
        return s3_bucket.website_configuration
1✔
3709

3710
    def put_bucket_website(
1✔
3711
        self,
3712
        context: RequestContext,
3713
        bucket: BucketName,
3714
        website_configuration: WebsiteConfiguration,
3715
        content_md5: ContentMD5 = None,
3716
        checksum_algorithm: ChecksumAlgorithm = None,
3717
        expected_bucket_owner: AccountId = None,
3718
        **kwargs,
3719
    ) -> None:
3720
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3721

3722
        validate_website_configuration(website_configuration)
1✔
3723
        s3_bucket.website_configuration = website_configuration
1✔
3724

3725
    def delete_bucket_website(
1✔
3726
        self,
3727
        context: RequestContext,
3728
        bucket: BucketName,
3729
        expected_bucket_owner: AccountId = None,
3730
        **kwargs,
3731
    ) -> None:
3732
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3733
        # does not raise error if the bucket did not have a config, will simply return
3734
        s3_bucket.website_configuration = None
1✔
3735

3736
    def get_object_lock_configuration(
1✔
3737
        self,
3738
        context: RequestContext,
3739
        bucket: BucketName,
3740
        expected_bucket_owner: AccountId = None,
3741
        **kwargs,
3742
    ) -> GetObjectLockConfigurationOutput:
3743
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3744
        if not s3_bucket.object_lock_enabled:
1✔
3745
            raise ObjectLockConfigurationNotFoundError(
1✔
3746
                "Object Lock configuration does not exist for this bucket",
3747
                BucketName=bucket,
3748
            )
3749

3750
        response = GetObjectLockConfigurationOutput(
1✔
3751
            ObjectLockConfiguration=ObjectLockConfiguration(
3752
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3753
            )
3754
        )
3755
        if s3_bucket.object_lock_default_retention:
1✔
3756
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3757
                "DefaultRetention": s3_bucket.object_lock_default_retention
3758
            }
3759

3760
        return response
1✔
3761

3762
    def put_object_lock_configuration(
1✔
3763
        self,
3764
        context: RequestContext,
3765
        bucket: BucketName,
3766
        object_lock_configuration: ObjectLockConfiguration = None,
3767
        request_payer: RequestPayer = None,
3768
        token: ObjectLockToken = None,
3769
        content_md5: ContentMD5 = None,
3770
        checksum_algorithm: ChecksumAlgorithm = None,
3771
        expected_bucket_owner: AccountId = None,
3772
        **kwargs,
3773
    ) -> PutObjectLockConfigurationOutput:
3774
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3775
        if s3_bucket.versioning_status != "Enabled":
1✔
3776
            raise InvalidBucketState(
1✔
3777
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3778
            )
3779

3780
        if (
1✔
3781
            not object_lock_configuration
3782
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3783
        ):
3784
            raise MalformedXML()
1✔
3785

3786
        if "Rule" not in object_lock_configuration:
1✔
3787
            s3_bucket.object_lock_default_retention = None
1✔
3788
            if not s3_bucket.object_lock_enabled:
1✔
3789
                s3_bucket.object_lock_enabled = True
1✔
3790

3791
            return PutObjectLockConfigurationOutput()
1✔
3792
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3793
            default_retention := rule.get("DefaultRetention")
3794
        ):
3795
            raise MalformedXML()
1✔
3796

3797
        if "Mode" not in default_retention or (
1✔
3798
            ("Days" in default_retention and "Years" in default_retention)
3799
            or ("Days" not in default_retention and "Years" not in default_retention)
3800
        ):
3801
            raise MalformedXML()
1✔
3802

3803
        if default_retention["Mode"] not in OBJECT_LOCK_MODES:
1✔
3804
            raise MalformedXML()
1✔
3805

3806
        s3_bucket.object_lock_default_retention = default_retention
1✔
3807
        if not s3_bucket.object_lock_enabled:
1✔
UNCOV
3808
            s3_bucket.object_lock_enabled = True
×
3809

3810
        return PutObjectLockConfigurationOutput()
1✔
3811

3812
    def get_object_legal_hold(
1✔
3813
        self,
3814
        context: RequestContext,
3815
        bucket: BucketName,
3816
        key: ObjectKey,
3817
        version_id: ObjectVersionId = None,
3818
        request_payer: RequestPayer = None,
3819
        expected_bucket_owner: AccountId = None,
3820
        **kwargs,
3821
    ) -> GetObjectLegalHoldOutput:
3822
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3823
        if not s3_bucket.object_lock_enabled:
1✔
3824
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3825

3826
        s3_object = s3_bucket.get_object(
1✔
3827
            key=key,
3828
            version_id=version_id,
3829
            http_method="GET",
3830
        )
3831
        if not s3_object.lock_legal_status:
1✔
3832
            raise NoSuchObjectLockConfiguration(
1✔
3833
                "The specified object does not have a ObjectLock configuration"
3834
            )
3835

3836
        return GetObjectLegalHoldOutput(
1✔
3837
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3838
        )
3839

3840
    def put_object_legal_hold(
1✔
3841
        self,
3842
        context: RequestContext,
3843
        bucket: BucketName,
3844
        key: ObjectKey,
3845
        legal_hold: ObjectLockLegalHold = None,
3846
        request_payer: RequestPayer = None,
3847
        version_id: ObjectVersionId = None,
3848
        content_md5: ContentMD5 = None,
3849
        checksum_algorithm: ChecksumAlgorithm = None,
3850
        expected_bucket_owner: AccountId = None,
3851
        **kwargs,
3852
    ) -> PutObjectLegalHoldOutput:
3853
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3854

3855
        if not legal_hold:
1✔
3856
            raise MalformedXML()
1✔
3857

3858
        if not s3_bucket.object_lock_enabled:
1✔
3859
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3860

3861
        s3_object = s3_bucket.get_object(
1✔
3862
            key=key,
3863
            version_id=version_id,
3864
            http_method="PUT",
3865
        )
3866
        # TODO: check casing
3867
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
UNCOV
3868
            raise MalformedXML()
×
3869

3870
        s3_object.lock_legal_status = status
1✔
3871

3872
        # TODO: return RequestCharged
3873
        return PutObjectRetentionOutput()
1✔
3874

3875
    def get_object_retention(
1✔
3876
        self,
3877
        context: RequestContext,
3878
        bucket: BucketName,
3879
        key: ObjectKey,
3880
        version_id: ObjectVersionId = None,
3881
        request_payer: RequestPayer = None,
3882
        expected_bucket_owner: AccountId = None,
3883
        **kwargs,
3884
    ) -> GetObjectRetentionOutput:
3885
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3886
        if not s3_bucket.object_lock_enabled:
1✔
3887
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3888

3889
        s3_object = s3_bucket.get_object(
1✔
3890
            key=key,
3891
            version_id=version_id,
3892
            http_method="GET",
3893
        )
3894
        if not s3_object.lock_mode:
1✔
3895
            raise NoSuchObjectLockConfiguration(
1✔
3896
                "The specified object does not have a ObjectLock configuration"
3897
            )
3898

3899
        return GetObjectRetentionOutput(
1✔
3900
            Retention=ObjectLockRetention(
3901
                Mode=s3_object.lock_mode,
3902
                RetainUntilDate=s3_object.lock_until,
3903
            )
3904
        )
3905

3906
    def put_object_retention(
1✔
3907
        self,
3908
        context: RequestContext,
3909
        bucket: BucketName,
3910
        key: ObjectKey,
3911
        retention: ObjectLockRetention = None,
3912
        request_payer: RequestPayer = None,
3913
        version_id: ObjectVersionId = None,
3914
        bypass_governance_retention: BypassGovernanceRetention = None,
3915
        content_md5: ContentMD5 = None,
3916
        checksum_algorithm: ChecksumAlgorithm = None,
3917
        expected_bucket_owner: AccountId = None,
3918
        **kwargs,
3919
    ) -> PutObjectRetentionOutput:
3920
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3921
        if not s3_bucket.object_lock_enabled:
1✔
3922
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3923

3924
        s3_object = s3_bucket.get_object(
1✔
3925
            key=key,
3926
            version_id=version_id,
3927
            http_method="PUT",
3928
        )
3929

3930
        if retention and (
1✔
3931
            not validate_dict_fields(retention, required_fields={"Mode", "RetainUntilDate"})
3932
            or retention["Mode"] not in OBJECT_LOCK_MODES
3933
        ):
3934
            raise MalformedXML()
1✔
3935

3936
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
3937
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
3938
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
3939
            pst_datetime = retention["RetainUntilDate"].astimezone(
1✔
3940
                tz=ZoneInfo("America/Los_Angeles")
3941
            )
3942
            raise InvalidArgument(
1✔
3943
                "The retain until date must be in the future!",
3944
                ArgumentName="RetainUntilDate",
3945
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
3946
            )
3947

3948
        is_request_reducing_locking = (
1✔
3949
            not retention
3950
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
3951
            or (
3952
                retention["Mode"] == ObjectLockMode.GOVERNANCE
3953
                and s3_object.lock_mode == ObjectLockMode.COMPLIANCE
3954
            )
3955
        )
3956
        if is_request_reducing_locking and (
1✔
3957
            s3_object.lock_mode == ObjectLockMode.COMPLIANCE
3958
            or (
3959
                s3_object.lock_mode == ObjectLockMode.GOVERNANCE and not bypass_governance_retention
3960
            )
3961
        ):
3962
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
3963

3964
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
3965
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
3966

3967
        # TODO: return RequestCharged
3968
        return PutObjectRetentionOutput()
1✔
3969

3970
    def put_bucket_request_payment(
1✔
3971
        self,
3972
        context: RequestContext,
3973
        bucket: BucketName,
3974
        request_payment_configuration: RequestPaymentConfiguration,
3975
        content_md5: ContentMD5 = None,
3976
        checksum_algorithm: ChecksumAlgorithm = None,
3977
        expected_bucket_owner: AccountId = None,
3978
        **kwargs,
3979
    ) -> None:
3980
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3981
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3982

3983
        payer = request_payment_configuration.get("Payer")
1✔
3984
        if payer not in ["Requester", "BucketOwner"]:
1✔
3985
            raise MalformedXML()
1✔
3986

3987
        s3_bucket.payer = payer
1✔
3988

3989
    def get_bucket_request_payment(
1✔
3990
        self,
3991
        context: RequestContext,
3992
        bucket: BucketName,
3993
        expected_bucket_owner: AccountId = None,
3994
        **kwargs,
3995
    ) -> GetBucketRequestPaymentOutput:
3996
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3997
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3998

3999
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
4000

4001
    def get_bucket_ownership_controls(
1✔
4002
        self,
4003
        context: RequestContext,
4004
        bucket: BucketName,
4005
        expected_bucket_owner: AccountId = None,
4006
        **kwargs,
4007
    ) -> GetBucketOwnershipControlsOutput:
4008
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4009

4010
        if not s3_bucket.object_ownership:
1✔
4011
            raise OwnershipControlsNotFoundError(
1✔
4012
                "The bucket ownership controls were not found",
4013
                BucketName=bucket,
4014
            )
4015

4016
        return GetBucketOwnershipControlsOutput(
1✔
4017
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
4018
        )
4019

4020
    def put_bucket_ownership_controls(
1✔
4021
        self,
4022
        context: RequestContext,
4023
        bucket: BucketName,
4024
        ownership_controls: OwnershipControls,
4025
        content_md5: ContentMD5 | None = None,
4026
        expected_bucket_owner: AccountId | None = None,
4027
        checksum_algorithm: ChecksumAlgorithm | None = None,
4028
        **kwargs,
4029
    ) -> None:
4030
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4031
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
4032
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4033

4034
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
4035
            raise MalformedXML()
1✔
4036

4037
        rule = rules[0]
1✔
4038
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
4039
            raise MalformedXML()
1✔
4040

4041
        s3_bucket.object_ownership = object_ownership
1✔
4042

4043
    def delete_bucket_ownership_controls(
1✔
4044
        self,
4045
        context: RequestContext,
4046
        bucket: BucketName,
4047
        expected_bucket_owner: AccountId = None,
4048
        **kwargs,
4049
    ) -> None:
4050
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4051

4052
        s3_bucket.object_ownership = None
1✔
4053

4054
    def get_public_access_block(
1✔
4055
        self,
4056
        context: RequestContext,
4057
        bucket: BucketName,
4058
        expected_bucket_owner: AccountId = None,
4059
        **kwargs,
4060
    ) -> GetPublicAccessBlockOutput:
4061
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4062

4063
        if not s3_bucket.public_access_block:
1✔
4064
            raise NoSuchPublicAccessBlockConfiguration(
1✔
4065
                "The public access block configuration was not found", BucketName=bucket
4066
            )
4067

4068
        return GetPublicAccessBlockOutput(
1✔
4069
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
4070
        )
4071

4072
    def put_public_access_block(
1✔
4073
        self,
4074
        context: RequestContext,
4075
        bucket: BucketName,
4076
        public_access_block_configuration: PublicAccessBlockConfiguration,
4077
        content_md5: ContentMD5 = None,
4078
        checksum_algorithm: ChecksumAlgorithm = None,
4079
        expected_bucket_owner: AccountId = None,
4080
        **kwargs,
4081
    ) -> None:
4082
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4083
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
4084
        #  bucket configuration. See s3control
4085
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4086

4087
        public_access_block_fields = {
1✔
4088
            "BlockPublicAcls",
4089
            "BlockPublicPolicy",
4090
            "IgnorePublicAcls",
4091
            "RestrictPublicBuckets",
4092
        }
4093
        if not validate_dict_fields(
1✔
4094
            public_access_block_configuration,
4095
            required_fields=set(),
4096
            optional_fields=public_access_block_fields,
4097
        ):
UNCOV
4098
            raise MalformedXML()
×
4099

4100
        for field in public_access_block_fields:
1✔
4101
            if public_access_block_configuration.get(field) is None:
1✔
4102
                public_access_block_configuration[field] = False
1✔
4103

4104
        s3_bucket.public_access_block = public_access_block_configuration
1✔
4105

4106
    def delete_public_access_block(
1✔
4107
        self,
4108
        context: RequestContext,
4109
        bucket: BucketName,
4110
        expected_bucket_owner: AccountId = None,
4111
        **kwargs,
4112
    ) -> None:
4113
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4114

4115
        s3_bucket.public_access_block = None
1✔
4116

4117
    def get_bucket_policy(
1✔
4118
        self,
4119
        context: RequestContext,
4120
        bucket: BucketName,
4121
        expected_bucket_owner: AccountId = None,
4122
        **kwargs,
4123
    ) -> GetBucketPolicyOutput:
4124
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4125
            context, bucket, expected_bucket_owner=expected_bucket_owner
4126
        )
4127
        if not s3_bucket.policy:
1✔
4128
            raise NoSuchBucketPolicy(
1✔
4129
                "The bucket policy does not exist",
4130
                BucketName=bucket,
4131
            )
4132
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
4133

4134
    def put_bucket_policy(
1✔
4135
        self,
4136
        context: RequestContext,
4137
        bucket: BucketName,
4138
        policy: Policy,
4139
        content_md5: ContentMD5 = None,
4140
        checksum_algorithm: ChecksumAlgorithm = None,
4141
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
4142
        expected_bucket_owner: AccountId = None,
4143
        **kwargs,
4144
    ) -> None:
4145
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4146
            context, bucket, expected_bucket_owner=expected_bucket_owner
4147
        )
4148

4149
        if not policy or policy[0] != "{":
1✔
4150
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
4151
        try:
1✔
4152
            json_policy = json.loads(policy)
1✔
4153
            if not json_policy:
1✔
4154
                # TODO: add more validation around the policy?
4155
                raise MalformedPolicy("Missing required field Statement")
1✔
4156
        except ValueError:
1✔
UNCOV
4157
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
4158

4159
        s3_bucket.policy = policy
1✔
4160

4161
    def delete_bucket_policy(
1✔
4162
        self,
4163
        context: RequestContext,
4164
        bucket: BucketName,
4165
        expected_bucket_owner: AccountId = None,
4166
        **kwargs,
4167
    ) -> None:
4168
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4169
            context, bucket, expected_bucket_owner=expected_bucket_owner
4170
        )
4171

4172
        s3_bucket.policy = None
1✔
4173

4174
    def get_bucket_accelerate_configuration(
1✔
4175
        self,
4176
        context: RequestContext,
4177
        bucket: BucketName,
4178
        expected_bucket_owner: AccountId = None,
4179
        request_payer: RequestPayer = None,
4180
        **kwargs,
4181
    ) -> GetBucketAccelerateConfigurationOutput:
4182
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4183

4184
        response = GetBucketAccelerateConfigurationOutput()
1✔
4185
        if s3_bucket.accelerate_status:
1✔
4186
            response["Status"] = s3_bucket.accelerate_status
1✔
4187

4188
        return response
1✔
4189

4190
    def put_bucket_accelerate_configuration(
1✔
4191
        self,
4192
        context: RequestContext,
4193
        bucket: BucketName,
4194
        accelerate_configuration: AccelerateConfiguration,
4195
        expected_bucket_owner: AccountId = None,
4196
        checksum_algorithm: ChecksumAlgorithm = None,
4197
        **kwargs,
4198
    ) -> None:
4199
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4200

4201
        if "." in bucket:
1✔
4202
            raise InvalidRequest(
1✔
4203
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
4204
            )
4205

4206
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
4207
            "Enabled",
4208
            "Suspended",
4209
        ):
4210
            raise MalformedXML()
1✔
4211

4212
        s3_bucket.accelerate_status = status
1✔
4213

4214
    def put_bucket_logging(
1✔
4215
        self,
4216
        context: RequestContext,
4217
        bucket: BucketName,
4218
        bucket_logging_status: BucketLoggingStatus,
4219
        content_md5: ContentMD5 = None,
4220
        checksum_algorithm: ChecksumAlgorithm = None,
4221
        expected_bucket_owner: AccountId = None,
4222
        **kwargs,
4223
    ) -> None:
4224
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4225

4226
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
4227
            s3_bucket.logging = {}
1✔
4228
            return
1✔
4229

4230
        # the target bucket must be in the same account
4231
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
UNCOV
4232
            raise MalformedXML()
×
4233

4234
        if not logging_config.get("TargetPrefix"):
1✔
UNCOV
4235
            logging_config["TargetPrefix"] = ""
×
4236

4237
        # TODO: validate Grants
4238

4239
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
4240
            raise InvalidTargetBucketForLogging(
1✔
4241
                "The target bucket for logging does not exist",
4242
                TargetBucket=target_bucket_name,
4243
            )
4244

4245
        source_bucket_region = s3_bucket.bucket_region
1✔
4246
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
4247
            raise (
1✔
4248
                CrossLocationLoggingProhibitted(
4249
                    "Cross S3 location logging not allowed. ",
4250
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4251
                )
4252
                if source_bucket_region == AWS_REGION_US_EAST_1
4253
                else CrossLocationLoggingProhibitted(
4254
                    "Cross S3 location logging not allowed. ",
4255
                    SourceBucketLocation=source_bucket_region,
4256
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4257
                )
4258
            )
4259

4260
        s3_bucket.logging = logging_config
1✔
4261

4262
    def get_bucket_logging(
1✔
4263
        self,
4264
        context: RequestContext,
4265
        bucket: BucketName,
4266
        expected_bucket_owner: AccountId = None,
4267
        **kwargs,
4268
    ) -> GetBucketLoggingOutput:
4269
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4270

4271
        if not s3_bucket.logging:
1✔
4272
            return GetBucketLoggingOutput()
1✔
4273

4274
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4275

4276
    def put_bucket_replication(
1✔
4277
        self,
4278
        context: RequestContext,
4279
        bucket: BucketName,
4280
        replication_configuration: ReplicationConfiguration,
4281
        content_md5: ContentMD5 = None,
4282
        checksum_algorithm: ChecksumAlgorithm = None,
4283
        token: ObjectLockToken = None,
4284
        expected_bucket_owner: AccountId = None,
4285
        **kwargs,
4286
    ) -> None:
4287
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4288
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4289
            raise InvalidRequest(
1✔
4290
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4291
            )
4292

4293
        if not (rules := replication_configuration.get("Rules")):
1✔
4294
            raise MalformedXML()
1✔
4295

4296
        for rule in rules:
1✔
4297
            if "ID" not in rule:
1✔
4298
                rule["ID"] = short_uid()
1✔
4299

4300
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4301
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4302
            if (
1✔
4303
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4304
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4305
            ):
4306
                # according to AWS testing the same exception is raised if the bucket does not exist
4307
                # or if versioning was disabled
4308
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4309

4310
        # TODO more validation on input
4311
        s3_bucket.replication = replication_configuration
1✔
4312

4313
    def get_bucket_replication(
1✔
4314
        self,
4315
        context: RequestContext,
4316
        bucket: BucketName,
4317
        expected_bucket_owner: AccountId = None,
4318
        **kwargs,
4319
    ) -> GetBucketReplicationOutput:
4320
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4321

4322
        if not s3_bucket.replication:
1✔
4323
            raise ReplicationConfigurationNotFoundError(
1✔
4324
                "The replication configuration was not found",
4325
                BucketName=bucket,
4326
            )
4327

4328
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4329

4330
    def delete_bucket_replication(
1✔
4331
        self,
4332
        context: RequestContext,
4333
        bucket: BucketName,
4334
        expected_bucket_owner: AccountId = None,
4335
        **kwargs,
4336
    ) -> None:
4337
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4338

4339
        s3_bucket.replication = None
1✔
4340

4341
    @handler("PutBucketAcl", expand=False)
1✔
4342
    def put_bucket_acl(
1✔
4343
        self,
4344
        context: RequestContext,
4345
        request: PutBucketAclRequest,
4346
    ) -> None:
4347
        bucket = request["Bucket"]
1✔
4348
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4349
        acp = get_access_control_policy_from_acl_request(
1✔
4350
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4351
        )
4352
        s3_bucket.acl = acp
1✔
4353

4354
    def get_bucket_acl(
1✔
4355
        self,
4356
        context: RequestContext,
4357
        bucket: BucketName,
4358
        expected_bucket_owner: AccountId = None,
4359
        **kwargs,
4360
    ) -> GetBucketAclOutput:
4361
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4362

4363
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4364

4365
    @handler("PutObjectAcl", expand=False)
1✔
4366
    def put_object_acl(
1✔
4367
        self,
4368
        context: RequestContext,
4369
        request: PutObjectAclRequest,
4370
    ) -> PutObjectAclOutput:
4371
        bucket = request["Bucket"]
1✔
4372
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4373

4374
        s3_object = s3_bucket.get_object(
1✔
4375
            key=request["Key"],
4376
            version_id=request.get("VersionId"),
4377
            http_method="PUT",
4378
        )
4379
        acp = get_access_control_policy_from_acl_request(
1✔
4380
            request=request, owner=s3_object.owner, request_body=context.request.data
4381
        )
4382
        previous_acl = s3_object.acl
1✔
4383
        s3_object.acl = acp
1✔
4384

4385
        if previous_acl != acp:
1✔
4386
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4387

4388
        # TODO: RequestCharged
4389
        return PutObjectAclOutput()
1✔
4390

4391
    def get_object_acl(
1✔
4392
        self,
4393
        context: RequestContext,
4394
        bucket: BucketName,
4395
        key: ObjectKey,
4396
        version_id: ObjectVersionId = None,
4397
        request_payer: RequestPayer = None,
4398
        expected_bucket_owner: AccountId = None,
4399
        **kwargs,
4400
    ) -> GetObjectAclOutput:
4401
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4402

4403
        s3_object = s3_bucket.get_object(
1✔
4404
            key=key,
4405
            version_id=version_id,
4406
        )
4407
        # TODO: RequestCharged
4408
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4409

4410
    def get_bucket_policy_status(
1✔
4411
        self,
4412
        context: RequestContext,
4413
        bucket: BucketName,
4414
        expected_bucket_owner: AccountId = None,
4415
        **kwargs,
4416
    ) -> GetBucketPolicyStatusOutput:
4417
        raise NotImplementedError
4418

4419
    def get_object_torrent(
1✔
4420
        self,
4421
        context: RequestContext,
4422
        bucket: BucketName,
4423
        key: ObjectKey,
4424
        request_payer: RequestPayer = None,
4425
        expected_bucket_owner: AccountId = None,
4426
        **kwargs,
4427
    ) -> GetObjectTorrentOutput:
4428
        raise NotImplementedError
4429

4430
    def post_object(
1✔
4431
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4432
    ) -> PostResponse:
4433
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4434
            raise PreconditionFailed(
1✔
4435
                "At least one of the pre-conditions you specified did not hold",
4436
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4437
            )
4438
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4439
        # TODO: signature validation is not implemented for pre-signed POST
4440
        # policy validation is not implemented either, except expiration and mandatory fields
4441
        # This operation is the only one using form for storing the request data. We will have to do some manual
4442
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4443
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4444

4445
        form = context.request.form
1✔
4446
        object_key = context.request.form.get("key")
1✔
4447

4448
        if "file" in form:
1✔
4449
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4450
            file_data = to_bytes(form["file"])
1✔
4451
            object_content_length = len(file_data)
1✔
4452
            stream = BytesIO(file_data)
1✔
4453
        else:
4454
            # this is the default behaviour
4455
            fileobj = context.request.files["file"]
1✔
4456
            stream = fileobj.stream
1✔
4457
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4458
            # validation
4459
            original_pos = stream.tell()
1✔
4460
            object_content_length = stream.seek(0, 2)
1✔
4461
            # reset the stream and put it back at its original position
4462
            stream.seek(original_pos, 0)
1✔
4463

4464
            if "${filename}" in object_key:
1✔
4465
                # TODO: ${filename} is actually usable in all form fields
4466
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4467
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4468
                # is recognized by all form fields.
4469
                object_key = object_key.replace("${filename}", fileobj.filename)
1✔
4470

4471
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4472
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4473
        additional_policy_metadata = {
1✔
4474
            "bucket": bucket,
4475
            "content_length": object_content_length,
4476
        }
4477
        validate_post_policy(form, additional_policy_metadata)
1✔
4478

4479
        if canned_acl := form.get("acl"):
1✔
4480
            validate_canned_acl(canned_acl)
×
UNCOV
4481
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4482
        else:
4483
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4484

4485
        post_system_settable_headers = [
1✔
4486
            "Cache-Control",
4487
            "Content-Type",
4488
            "Content-Disposition",
4489
            "Content-Encoding",
4490
        ]
4491
        system_metadata = {}
1✔
4492
        for system_metadata_field in post_system_settable_headers:
1✔
4493
            if field_value := form.get(system_metadata_field):
1✔
4494
                system_key = system_metadata_field.replace("-", "")
1✔
4495
                system_metadata[system_key] = field_value
1✔
4496

4497
        if not system_metadata.get("ContentType"):
1✔
4498
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4499

4500
        user_metadata = {
1✔
4501
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4502
            for field in form
4503
            if field.startswith("x-amz-meta-")
4504
        }
4505

4506
        if tagging := form.get("tagging"):
1✔
4507
            # this is weird, as it's direct XML in the form, we need to parse it directly
4508
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4509

4510
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4511
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4512
        ):
4513
            raise InvalidStorageClass(
1✔
4514
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4515
            )
4516

4517
        encryption_request = {
1✔
4518
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4519
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4520
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4521
        }
4522

4523
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4524
            encryption_request,
4525
            s3_bucket,
4526
            store,
4527
        )
4528

4529
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4530
        checksum_value = (
1✔
4531
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4532
        )
4533
        expires = (
1✔
4534
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4535
        )
4536

4537
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4538

4539
        s3_object = S3Object(
1✔
4540
            key=object_key,
4541
            version_id=version_id,
4542
            storage_class=storage_class,
4543
            expires=expires,
4544
            user_metadata=user_metadata,
4545
            system_metadata=system_metadata,
4546
            checksum_algorithm=checksum_algorithm,
4547
            checksum_value=checksum_value,
4548
            encryption=encryption_parameters.encryption,
4549
            kms_key_id=encryption_parameters.kms_key_id,
4550
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4551
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4552
            acl=acp,
4553
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4554
        )
4555

4556
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4557
            s3_stored_object.write(stream)
1✔
4558

4559
            if not s3_object.checksum_value:
1✔
4560
                s3_object.checksum_value = s3_stored_object.checksum
1✔
4561

4562
            elif checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
×
4563
                self._storage_backend.remove(bucket, s3_object)
×
UNCOV
4564
                raise InvalidRequest(
×
4565
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4566
                )
4567

4568
            s3_bucket.objects.set(object_key, s3_object)
1✔
4569

4570
        # in case we are overriding an object, delete the tags entry
4571
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4572
        store.TAGS.tags.pop(key_id, None)
1✔
4573
        if tagging:
1✔
4574
            store.TAGS.tags[key_id] = tagging
1✔
4575

4576
        response = PostResponse()
1✔
4577
        # hacky way to set the etag in the headers as well: two locations for one value
4578
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4579

4580
        if redirect := form.get("success_action_redirect"):
1✔
4581
            # we need to create the redirect, as the parser could not return the moto-calculated one
4582
            try:
1✔
4583
                redirect = create_redirect_for_post_request(
1✔
4584
                    base_redirect=redirect,
4585
                    bucket=bucket,
4586
                    object_key=object_key,
4587
                    etag=s3_object.quoted_etag,
4588
                )
4589
                response["LocationHeader"] = redirect
1✔
4590
                response["StatusCode"] = 303
1✔
4591
            except ValueError:
1✔
4592
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4593
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4594

4595
        elif status_code := form.get("success_action_status"):
1✔
4596
            response["StatusCode"] = status_code
1✔
4597
        else:
4598
            response["StatusCode"] = 204
1✔
4599

4600
        response["LocationHeader"] = response.get(
1✔
4601
            "LocationHeader",
4602
            get_url_encoded_object_location(bucket, object_key),
4603
        )
4604

4605
        if s3_bucket.versioning_status == "Enabled":
1✔
UNCOV
4606
            response["VersionId"] = s3_object.version_id
×
4607

4608
        if s3_object.checksum_algorithm:
1✔
4609
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
4610
            response["ChecksumType"] = ChecksumType.FULL_OBJECT
1✔
4611

4612
        if s3_bucket.lifecycle_rules:
1✔
UNCOV
4613
            if expiration_header := self._get_expiration_header(
×
4614
                s3_bucket.lifecycle_rules,
4615
                bucket,
4616
                s3_object,
4617
                store.TAGS.tags.get(key_id, {}),
4618
            ):
4619
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4620
                #  apply them everytime we get/head an object
UNCOV
4621
                response["Expiration"] = expiration_header
×
4622

4623
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4624

4625
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4626

4627
        if response["StatusCode"] == "201":
1✔
4628
            # if the StatusCode is 201, S3 returns an XML body with additional information
4629
            response["ETag"] = s3_object.quoted_etag
1✔
4630
            response["Bucket"] = bucket
1✔
4631
            response["Key"] = object_key
1✔
4632
            response["Location"] = response["LocationHeader"]
1✔
4633

4634
        return response
1✔
4635

4636
    def put_bucket_metrics_configuration(
1✔
4637
        self,
4638
        context: RequestContext,
4639
        bucket: BucketName,
4640
        id: MetricsId,
4641
        metrics_configuration: MetricsConfiguration,
4642
        expected_bucket_owner: AccountId = None,
4643
        **kwargs,
4644
    ) -> None:
4645
        """
4646
        Update or add a new metrics configuration. If the provided `id` already exists, its associated configuration
4647
        will be overwritten. The total number of metric configurations is limited to 1000. If this limit is exceeded,
4648
        an error is raised unless the `is` already exists.
4649

4650
        :param context: The request context.
4651
        :param bucket: The name of the bucket associated with the metrics configuration.
4652
        :param id: Identifies the metrics configuration being added or updated.
4653
        :param metrics_configuration: A new or updated configuration associated with the given metrics identifier.
4654
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4655
        :return: None
4656
        :raises TooManyConfigurations: If the total number of metrics configurations exceeds 1000 AND the provided
4657
            `metrics_id` does not already exist.
4658
        """
4659
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4660
            context, bucket, expected_bucket_owner=expected_bucket_owner
4661
        )
4662

4663
        if (
1✔
4664
            len(s3_bucket.metric_configurations) >= 1000
4665
            and id not in s3_bucket.metric_configurations
4666
        ):
UNCOV
4667
            raise TooManyConfigurations("Too many metrics configurations")
×
4668
        s3_bucket.metric_configurations[id] = metrics_configuration
1✔
4669

4670
    def get_bucket_metrics_configuration(
1✔
4671
        self,
4672
        context: RequestContext,
4673
        bucket: BucketName,
4674
        id: MetricsId,
4675
        expected_bucket_owner: AccountId = None,
4676
        **kwargs,
4677
    ) -> GetBucketMetricsConfigurationOutput:
4678
        """
4679
        Retrieve the metrics configuration associated with a given metrics identifier.
4680

4681
        :param context: The request context.
4682
        :param bucket: The name of the bucket associated with the metrics configuration.
4683
        :param id: The unique identifier of the metrics configuration to retrieve.
4684
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4685
        :return: The metrics configuration associated with the given metrics identifier.
4686
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4687
        """
4688
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4689
            context, bucket, expected_bucket_owner=expected_bucket_owner
4690
        )
4691

4692
        metric_config = s3_bucket.metric_configurations.get(id)
1✔
4693
        if not metric_config:
1✔
4694
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4695
        return GetBucketMetricsConfigurationOutput(MetricsConfiguration=metric_config)
1✔
4696

4697
    def list_bucket_metrics_configurations(
1✔
4698
        self,
4699
        context: RequestContext,
4700
        bucket: BucketName,
4701
        continuation_token: Token = None,
4702
        expected_bucket_owner: AccountId = None,
4703
        **kwargs,
4704
    ) -> ListBucketMetricsConfigurationsOutput:
4705
        """
4706
        Lists the metric configurations available, allowing for pagination using a continuation token to retrieve more
4707
        results.
4708

4709
        :param context: The request context.
4710
        :param bucket: The name of the bucket associated with the metrics configuration.
4711
        :param continuation_token: An optional continuation token to retrieve the next set of results in case there are
4712
            more results than the default limit. Provided as a base64-encoded string value.
4713
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4714
        :return: A list of metric configurations and an optional continuation token for fetching subsequent data, if
4715
            applicable.
4716
        """
4717
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4718
            context, bucket, expected_bucket_owner=expected_bucket_owner
4719
        )
4720

4721
        metrics_configurations: list[MetricsConfiguration] = []
1✔
4722
        next_continuation_token = None
1✔
4723

4724
        decoded_continuation_token = (
1✔
4725
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
4726
            if continuation_token
4727
            else None
4728
        )
4729

4730
        for metric in sorted(s3_bucket.metric_configurations.values(), key=lambda r: r["Id"]):
1✔
4731
            if continuation_token and metric["Id"] < decoded_continuation_token:
1✔
4732
                continue
1✔
4733

4734
            if len(metrics_configurations) >= 100:
1✔
4735
                next_continuation_token = to_str(base64.urlsafe_b64encode(metric["Id"].encode()))
1✔
4736
                break
1✔
4737

4738
            metrics_configurations.append(metric)
1✔
4739

4740
        return ListBucketMetricsConfigurationsOutput(
1✔
4741
            IsTruncated=next_continuation_token is not None,
4742
            ContinuationToken=continuation_token,
4743
            NextContinuationToken=next_continuation_token,
4744
            MetricsConfigurationList=metrics_configurations,
4745
        )
4746

4747
    def delete_bucket_metrics_configuration(
1✔
4748
        self,
4749
        context: RequestContext,
4750
        bucket: BucketName,
4751
        id: MetricsId,
4752
        expected_bucket_owner: AccountId = None,
4753
        **kwargs,
4754
    ) -> None:
4755
        """
4756
        Removes a specific metrics configuration identified by its metrics ID.
4757

4758
        :param context: The request context.
4759
        :param bucket: The name of the bucket associated with the metrics configuration.
4760
        :param id: The unique identifier of the metrics configuration to delete.
4761
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4762
        :return: None
4763
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4764
        """
4765
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4766
            context, bucket, expected_bucket_owner=expected_bucket_owner
4767
        )
4768

4769
        deleted_config = s3_bucket.metric_configurations.pop(id, None)
1✔
4770
        if not deleted_config:
1✔
4771
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4772

4773

4774
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4775
    if not bucket_versioning_status:
1✔
4776
        return None
1✔
4777
    elif bucket_versioning_status.lower() == "enabled":
1✔
4778
        return generate_safe_version_id()
1✔
4779
    else:
4780
        return "null"
1✔
4781

4782

4783
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4784
    if encryption := s3_object.encryption:
1✔
4785
        response["ServerSideEncryption"] = encryption
1✔
4786
        if encryption == ServerSideEncryption.aws_kms:
1✔
4787
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4788
            if s3_object.bucket_key_enabled:
1✔
4789
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4790

4791

4792
def get_encryption_parameters_from_request_and_bucket(
1✔
4793
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4794
    s3_bucket: S3Bucket,
4795
    store: S3Store,
4796
) -> EncryptionParameters:
4797
    if request.get("SSECustomerKey"):
1✔
4798
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4799
        return EncryptionParameters(None, None, False)
1✔
4800

4801
    encryption = request.get("ServerSideEncryption")
1✔
4802
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4803
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4804
    if s3_bucket.encryption_rule:
1✔
4805
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4806
        encryption = (
1✔
4807
            encryption
4808
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4809
        )
4810
        if encryption == ServerSideEncryption.aws_kms:
1✔
4811
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4812
                "ApplyServerSideEncryptionByDefault"
4813
            ].get("KMSMasterKeyID")
4814
            kms_key_id = get_kms_key_arn(
1✔
4815
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4816
            )
4817
            if not kms_key_id:
1✔
4818
                # if not key is provided, AWS will use an AWS managed KMS key
4819
                # create it if it doesn't already exist, and save it in the store per region
4820
                if not store.aws_managed_kms_key_id:
1✔
4821
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4822
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4823
                    )
4824
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4825

4826
                kms_key_id = store.aws_managed_kms_key_id
1✔
4827

4828
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4829

4830

4831
def get_object_lock_parameters_from_bucket_and_request(
1✔
4832
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4833
    s3_bucket: S3Bucket,
4834
):
4835
    lock_mode = request.get("ObjectLockMode")
1✔
4836
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4837
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4838

4839
    if lock_mode and not lock_until:
1✔
4840
        raise InvalidArgument(
1✔
4841
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4842
            ArgumentName="x-amz-object-lock-retain-until-date",
4843
        )
4844
    elif not lock_mode and lock_until:
1✔
4845
        raise InvalidArgument(
1✔
4846
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4847
            ArgumentName="x-amz-object-lock-mode",
4848
        )
4849

4850
    if lock_mode and lock_mode not in OBJECT_LOCK_MODES:
1✔
4851
        raise InvalidArgument(
1✔
4852
            "Unknown wormMode directive.",
4853
            ArgumentName="x-amz-object-lock-mode",
4854
            ArgumentValue=lock_mode,
4855
        )
4856

4857
    if (default_retention := s3_bucket.object_lock_default_retention) and not lock_mode:
1✔
4858
        lock_mode = default_retention["Mode"]
1✔
4859
        lock_until = get_retention_from_now(
1✔
4860
            days=default_retention.get("Days"),
4861
            years=default_retention.get("Years"),
4862
        )
4863

4864
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4865

4866

4867
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4868
    """
4869
    Calculate the range value from a part Number for an S3 Object
4870
    :param s3_object: S3Object
4871
    :param part_number: the wanted part from the S3Object
4872
    :return: an ObjectRange used to return only a slice of an Object
4873
    """
4874
    if not s3_object.parts:
1✔
4875
        if part_number > 1:
1✔
4876
            raise InvalidPartNumber(
1✔
4877
                "The requested partnumber is not satisfiable",
4878
                PartNumberRequested=part_number,
4879
                ActualPartCount=1,
4880
            )
4881
        return ObjectRange(
1✔
4882
            begin=0,
4883
            end=s3_object.size - 1,
4884
            content_length=s3_object.size,
4885
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4886
        )
4887
    elif not (part_data := s3_object.parts.get(str(part_number))):
1✔
4888
        raise InvalidPartNumber(
1✔
4889
            "The requested partnumber is not satisfiable",
4890
            PartNumberRequested=part_number,
4891
            ActualPartCount=len(s3_object.parts),
4892
        )
4893

4894
    # TODO: remove for next major version 5.0, compatibility for <= 4.5
4895
    if isinstance(part_data, tuple):
1✔
UNCOV
4896
        begin, part_length = part_data
×
4897
    else:
4898
        begin = part_data["_position"]
1✔
4899
        part_length = part_data["Size"]
1✔
4900

4901
    end = begin + part_length - 1
1✔
4902
    return ObjectRange(
1✔
4903
        begin=begin,
4904
        end=end,
4905
        content_length=part_length,
4906
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4907
    )
4908

4909

4910
def get_acl_headers_from_request(
1✔
4911
    request: PutObjectRequest
4912
    | CreateMultipartUploadRequest
4913
    | CopyObjectRequest
4914
    | CreateBucketRequest
4915
    | PutBucketAclRequest
4916
    | PutObjectAclRequest,
4917
) -> list[tuple[str, str]]:
4918
    permission_keys = [
1✔
4919
        "GrantFullControl",
4920
        "GrantRead",
4921
        "GrantReadACP",
4922
        "GrantWrite",
4923
        "GrantWriteACP",
4924
    ]
4925
    acl_headers = [
1✔
4926
        (permission, grant_header)
4927
        for permission in permission_keys
4928
        if (grant_header := request.get(permission))
4929
    ]
4930
    return acl_headers
1✔
4931

4932

4933
def get_access_control_policy_from_acl_request(
1✔
4934
    request: PutBucketAclRequest | PutObjectAclRequest,
4935
    owner: Owner,
4936
    request_body: bytes,
4937
) -> AccessControlPolicy:
4938
    canned_acl = request.get("ACL")
1✔
4939
    acl_headers = get_acl_headers_from_request(request)
1✔
4940

4941
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
4942
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
4943
    # use case seems dangerous
4944
    is_acp_in_body = request_body
1✔
4945

4946
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
4947
        raise MissingSecurityHeader(
1✔
4948
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
4949
        )
4950

4951
    elif canned_acl and acl_headers:
1✔
4952
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
4953

4954
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
4955
        raise UnexpectedContent("This request does not support content")
1✔
4956

4957
    if canned_acl:
1✔
4958
        validate_canned_acl(canned_acl)
1✔
4959
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
4960

4961
    elif acl_headers:
1✔
4962
        grants = []
1✔
4963
        for permission, grantees_values in acl_headers:
1✔
4964
            permission = get_permission_from_header(permission)
1✔
4965
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
4966
            grants.extend(partial_grants)
1✔
4967

4968
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
4969
    else:
4970
        acp = request.get("AccessControlPolicy")
1✔
4971
        validate_acl_acp(acp)
1✔
4972
        if (
1✔
4973
            owner.get("DisplayName")
4974
            and acp["Grants"]
4975
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
4976
        ):
4977
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
1✔
4978

4979
    return acp
1✔
4980

4981

4982
def get_access_control_policy_for_new_resource_request(
1✔
4983
    request: PutObjectRequest
4984
    | CreateMultipartUploadRequest
4985
    | CopyObjectRequest
4986
    | CreateBucketRequest,
4987
    owner: Owner,
4988
) -> AccessControlPolicy:
4989
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
4990
    canned_acl = request.get("ACL")
1✔
4991
    acl_headers = get_acl_headers_from_request(request)
1✔
4992

4993
    if not (canned_acl or acl_headers):
1✔
4994
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
4995

4996
    elif canned_acl and acl_headers:
1✔
UNCOV
4997
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
4998

4999
    if canned_acl:
1✔
5000
        validate_canned_acl(canned_acl)
1✔
5001
        return get_canned_acl(canned_acl, owner=owner)
1✔
5002

5003
    grants = []
×
5004
    for permission, grantees_values in acl_headers:
×
5005
        permission = get_permission_from_header(permission)
×
5006
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
UNCOV
5007
        grants.extend(partial_grants)
×
5008

UNCOV
5009
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
5010

5011

5012
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
5013
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
5014

5015

5016
def verify_object_equality_precondition_write(
1✔
5017
    s3_bucket: S3Bucket,
5018
    key: ObjectKey,
5019
    etag: str,
5020
    initiated: datetime.datetime | None = None,
5021
) -> None:
5022
    existing = s3_bucket.objects.get(key)
1✔
5023
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
5024
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
5025

5026
    if not existing.etag == etag.strip('"'):
1✔
5027
        raise PreconditionFailed(
1✔
5028
            "At least one of the pre-conditions you specified did not hold",
5029
            Condition="If-Match",
5030
        )
5031

5032
    if initiated and initiated < existing.last_modified:
1✔
5033
        raise ConditionalRequestConflict(
1✔
5034
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
5035
            Condition="If-Match",
5036
            Key=key,
5037
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc