• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17846762595

18 Sep 2025 02:49PM UTC coverage: 86.844% (-0.005%) from 86.849%
17846762595

push

github

web-flow
update old references to venv paths (#13163)

67603 of 77844 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.65
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
from collections import defaultdict
1✔
8
from inspect import signature
1✔
9
from io import BytesIO
1✔
10
from operator import itemgetter
1✔
11
from typing import IO
1✔
12
from urllib import parse as urlparse
1✔
13
from zoneinfo import ZoneInfo
1✔
14

15
from localstack import config
1✔
16
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
17
from localstack.aws.api.s3 import (
1✔
18
    MFA,
19
    AbortMultipartUploadOutput,
20
    AccelerateConfiguration,
21
    AccessControlPolicy,
22
    AccessDenied,
23
    AccountId,
24
    AnalyticsConfiguration,
25
    AnalyticsId,
26
    BadDigest,
27
    Body,
28
    Bucket,
29
    BucketAlreadyExists,
30
    BucketAlreadyOwnedByYou,
31
    BucketCannedACL,
32
    BucketLifecycleConfiguration,
33
    BucketLoggingStatus,
34
    BucketName,
35
    BucketNotEmpty,
36
    BucketRegion,
37
    BucketVersioningStatus,
38
    BypassGovernanceRetention,
39
    ChecksumAlgorithm,
40
    ChecksumCRC32,
41
    ChecksumCRC32C,
42
    ChecksumCRC64NVME,
43
    ChecksumSHA1,
44
    ChecksumSHA256,
45
    ChecksumType,
46
    CommonPrefix,
47
    CompletedMultipartUpload,
48
    CompleteMultipartUploadOutput,
49
    ConditionalRequestConflict,
50
    ConfirmRemoveSelfBucketAccess,
51
    ContentMD5,
52
    CopyObjectOutput,
53
    CopyObjectRequest,
54
    CopyObjectResult,
55
    CopyPartResult,
56
    CORSConfiguration,
57
    CreateBucketOutput,
58
    CreateBucketRequest,
59
    CreateMultipartUploadOutput,
60
    CreateMultipartUploadRequest,
61
    CrossLocationLoggingProhibitted,
62
    Delete,
63
    DeletedObject,
64
    DeleteMarkerEntry,
65
    DeleteObjectOutput,
66
    DeleteObjectsOutput,
67
    DeleteObjectTaggingOutput,
68
    Delimiter,
69
    EncodingType,
70
    Error,
71
    Expiration,
72
    FetchOwner,
73
    GetBucketAccelerateConfigurationOutput,
74
    GetBucketAclOutput,
75
    GetBucketAnalyticsConfigurationOutput,
76
    GetBucketCorsOutput,
77
    GetBucketEncryptionOutput,
78
    GetBucketIntelligentTieringConfigurationOutput,
79
    GetBucketInventoryConfigurationOutput,
80
    GetBucketLifecycleConfigurationOutput,
81
    GetBucketLocationOutput,
82
    GetBucketLoggingOutput,
83
    GetBucketMetricsConfigurationOutput,
84
    GetBucketOwnershipControlsOutput,
85
    GetBucketPolicyOutput,
86
    GetBucketPolicyStatusOutput,
87
    GetBucketReplicationOutput,
88
    GetBucketRequestPaymentOutput,
89
    GetBucketTaggingOutput,
90
    GetBucketVersioningOutput,
91
    GetBucketWebsiteOutput,
92
    GetObjectAclOutput,
93
    GetObjectAttributesOutput,
94
    GetObjectAttributesParts,
95
    GetObjectAttributesRequest,
96
    GetObjectLegalHoldOutput,
97
    GetObjectLockConfigurationOutput,
98
    GetObjectOutput,
99
    GetObjectRequest,
100
    GetObjectRetentionOutput,
101
    GetObjectTaggingOutput,
102
    GetObjectTorrentOutput,
103
    GetPublicAccessBlockOutput,
104
    HeadBucketOutput,
105
    HeadObjectOutput,
106
    HeadObjectRequest,
107
    IfMatch,
108
    IfMatchInitiatedTime,
109
    IfMatchLastModifiedTime,
110
    IfMatchSize,
111
    IfNoneMatch,
112
    IntelligentTieringConfiguration,
113
    IntelligentTieringId,
114
    InvalidArgument,
115
    InvalidBucketName,
116
    InvalidDigest,
117
    InvalidLocationConstraint,
118
    InvalidObjectState,
119
    InvalidPartNumber,
120
    InvalidPartOrder,
121
    InvalidStorageClass,
122
    InvalidTargetBucketForLogging,
123
    InventoryConfiguration,
124
    InventoryId,
125
    KeyMarker,
126
    LifecycleRules,
127
    ListBucketAnalyticsConfigurationsOutput,
128
    ListBucketIntelligentTieringConfigurationsOutput,
129
    ListBucketInventoryConfigurationsOutput,
130
    ListBucketMetricsConfigurationsOutput,
131
    ListBucketsOutput,
132
    ListMultipartUploadsOutput,
133
    ListObjectsOutput,
134
    ListObjectsV2Output,
135
    ListObjectVersionsOutput,
136
    ListPartsOutput,
137
    Marker,
138
    MaxBuckets,
139
    MaxKeys,
140
    MaxParts,
141
    MaxUploads,
142
    MethodNotAllowed,
143
    MetricsConfiguration,
144
    MetricsId,
145
    MissingSecurityHeader,
146
    MpuObjectSize,
147
    MultipartUpload,
148
    MultipartUploadId,
149
    NoSuchBucket,
150
    NoSuchBucketPolicy,
151
    NoSuchCORSConfiguration,
152
    NoSuchKey,
153
    NoSuchLifecycleConfiguration,
154
    NoSuchPublicAccessBlockConfiguration,
155
    NoSuchTagSet,
156
    NoSuchUpload,
157
    NoSuchWebsiteConfiguration,
158
    NotificationConfiguration,
159
    Object,
160
    ObjectIdentifier,
161
    ObjectKey,
162
    ObjectLockConfiguration,
163
    ObjectLockConfigurationNotFoundError,
164
    ObjectLockEnabled,
165
    ObjectLockLegalHold,
166
    ObjectLockMode,
167
    ObjectLockRetention,
168
    ObjectLockToken,
169
    ObjectOwnership,
170
    ObjectPart,
171
    ObjectVersion,
172
    ObjectVersionId,
173
    ObjectVersionStorageClass,
174
    OptionalObjectAttributesList,
175
    Owner,
176
    OwnershipControls,
177
    OwnershipControlsNotFoundError,
178
    Part,
179
    PartNumber,
180
    PartNumberMarker,
181
    Policy,
182
    PostResponse,
183
    PreconditionFailed,
184
    Prefix,
185
    PublicAccessBlockConfiguration,
186
    PutBucketAclRequest,
187
    PutBucketLifecycleConfigurationOutput,
188
    PutObjectAclOutput,
189
    PutObjectAclRequest,
190
    PutObjectLegalHoldOutput,
191
    PutObjectLockConfigurationOutput,
192
    PutObjectOutput,
193
    PutObjectRequest,
194
    PutObjectRetentionOutput,
195
    PutObjectTaggingOutput,
196
    ReplicationConfiguration,
197
    ReplicationConfigurationNotFoundError,
198
    RequestPayer,
199
    RequestPaymentConfiguration,
200
    RestoreObjectOutput,
201
    RestoreRequest,
202
    S3Api,
203
    ServerSideEncryption,
204
    ServerSideEncryptionConfiguration,
205
    SkipValidation,
206
    SSECustomerAlgorithm,
207
    SSECustomerKey,
208
    SSECustomerKeyMD5,
209
    StartAfter,
210
    StorageClass,
211
    Tagging,
212
    Token,
213
    TransitionDefaultMinimumObjectSize,
214
    UploadIdMarker,
215
    UploadPartCopyOutput,
216
    UploadPartCopyRequest,
217
    UploadPartOutput,
218
    UploadPartRequest,
219
    VersionIdMarker,
220
    VersioningConfiguration,
221
    WebsiteConfiguration,
222
)
223
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
224
from localstack.aws.handlers import (
1✔
225
    modify_service_response,
226
    preprocess_request,
227
    serve_custom_service_request_handlers,
228
)
229
from localstack.constants import AWS_REGION_US_EAST_1
1✔
230
from localstack.services.edge import ROUTER
1✔
231
from localstack.services.plugins import ServiceLifecycleHook
1✔
232
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
233
from localstack.services.s3.constants import (
1✔
234
    ALLOWED_HEADER_OVERRIDES,
235
    ARCHIVES_STORAGE_CLASSES,
236
    CHECKSUM_ALGORITHMS,
237
    DEFAULT_BUCKET_ENCRYPTION,
238
)
239
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
240
from localstack.services.s3.exceptions import (
1✔
241
    InvalidBucketOwnerAWSAccountID,
242
    InvalidBucketState,
243
    InvalidRequest,
244
    MalformedPolicy,
245
    MalformedXML,
246
    NoSuchConfiguration,
247
    NoSuchObjectLockConfiguration,
248
    TooManyConfigurations,
249
    UnexpectedContent,
250
)
251
from localstack.services.s3.models import (
1✔
252
    BucketCorsIndex,
253
    EncryptionParameters,
254
    ObjectLockParameters,
255
    S3Bucket,
256
    S3DeleteMarker,
257
    S3Multipart,
258
    S3Object,
259
    S3Part,
260
    S3Store,
261
    VersionedKeyStore,
262
    s3_stores,
263
)
264
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
265
from localstack.services.s3.presigned_url import validate_post_policy
1✔
266
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
267
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
268
from localstack.services.s3.utils import (
1✔
269
    ObjectRange,
270
    add_expiration_days_to_datetime,
271
    base_64_content_md5_to_etag,
272
    create_redirect_for_post_request,
273
    create_s3_kms_managed_key_for_region,
274
    etag_to_base_64_content_md5,
275
    extract_bucket_key_version_id_from_copy_source,
276
    generate_safe_version_id,
277
    get_canned_acl,
278
    get_class_attrs_from_spec_class,
279
    get_failed_precondition_copy_source,
280
    get_failed_upload_part_copy_source_preconditions,
281
    get_full_default_bucket_location,
282
    get_kms_key_arn,
283
    get_lifecycle_rule_from_object,
284
    get_owner_for_account_id,
285
    get_permission_from_header,
286
    get_retention_from_now,
287
    get_s3_checksum_algorithm_from_request,
288
    get_s3_checksum_algorithm_from_trailing_headers,
289
    get_system_metadata_from_request,
290
    get_unique_key_id,
291
    is_bucket_name_valid,
292
    is_version_older_than_other,
293
    parse_copy_source_range_header,
294
    parse_post_object_tagging_xml,
295
    parse_range_header,
296
    parse_tagging_header,
297
    s3_response_handler,
298
    serialize_expiration_header,
299
    str_to_rfc_1123_datetime,
300
    validate_dict_fields,
301
    validate_failed_precondition,
302
    validate_kms_key_id,
303
    validate_tag_set,
304
)
305
from localstack.services.s3.validation import (
1✔
306
    parse_grants_in_headers,
307
    validate_acl_acp,
308
    validate_bucket_analytics_configuration,
309
    validate_bucket_intelligent_tiering_configuration,
310
    validate_canned_acl,
311
    validate_checksum_value,
312
    validate_cors_configuration,
313
    validate_inventory_configuration,
314
    validate_lifecycle_configuration,
315
    validate_object_key,
316
    validate_sse_c,
317
    validate_website_configuration,
318
)
319
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
320
from localstack.state import AssetDirectory, StateVisitor
1✔
321
from localstack.utils.aws.arns import s3_bucket_name
1✔
322
from localstack.utils.collections import select_from_typed_dict
1✔
323
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
324

325
LOG = logging.getLogger(__name__)
1✔
326

327
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
328
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
329
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
330
OBJECT_LOCK_MODES = get_class_attrs_from_spec_class(ObjectLockMode)
1✔
331

332
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
333

334

335
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
336
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
337
        super().__init__()
1✔
338
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
339
        self._notification_dispatcher = NotificationDispatcher()
1✔
340
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
341

342
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
343
        # in case the rules have changed
344
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
345

346
    def on_after_init(self):
1✔
347
        preprocess_request.append(self._cors_handler)
1✔
348
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
349
        modify_service_response.append(self.service, s3_response_handler)
1✔
350
        register_website_hosting_routes(router=ROUTER)
1✔
351

352
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
353
        visitor.visit(s3_stores)
×
354
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
355

356
    def on_before_state_save(self):
1✔
357
        self._storage_backend.flush()
×
358

359
    def on_after_state_reset(self):
1✔
360
        self._cors_handler.invalidate_cache()
×
361

362
    def on_after_state_load(self):
1✔
363
        self._cors_handler.invalidate_cache()
×
364

365
    def on_before_stop(self):
1✔
366
        self._notification_dispatcher.shutdown()
1✔
367
        self._storage_backend.close()
1✔
368

369
    def _notify(
1✔
370
        self,
371
        context: RequestContext,
372
        s3_bucket: S3Bucket,
373
        s3_object: S3Object | S3DeleteMarker = None,
374
        s3_notif_ctx: S3EventNotificationContext = None,
375
    ):
376
        """
377
        :param context: the RequestContext, to retrieve more information about the incoming notification
378
        :param s3_bucket: the S3Bucket object
379
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
380
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
381
        :return:
382
        """
383
        if s3_bucket.notification_configuration:
1✔
384
            if not s3_notif_ctx:
1✔
385
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
386
                    context,
387
                    s3_bucket=s3_bucket,
388
                    s3_object=s3_object,
389
                )
390

391
            self._notification_dispatcher.send_notifications(
1✔
392
                s3_notif_ctx, s3_bucket.notification_configuration
393
            )
394

395
    def _verify_notification_configuration(
1✔
396
        self,
397
        notification_configuration: NotificationConfiguration,
398
        skip_destination_validation: SkipValidation,
399
        context: RequestContext,
400
        bucket_name: str,
401
    ):
402
        self._notification_dispatcher.verify_configuration(
1✔
403
            notification_configuration, skip_destination_validation, context, bucket_name
404
        )
405

406
    def _get_expiration_header(
1✔
407
        self,
408
        lifecycle_rules: LifecycleRules,
409
        bucket: BucketName,
410
        s3_object: S3Object,
411
        object_tags: dict[str, str],
412
    ) -> Expiration:
413
        """
414
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
415
        the case. We're caching it because it can change depending on the set rules on the bucket.
416
        We can't use `lru_cache` as the parameters needs to be hashable
417
        :param lifecycle_rules: the bucket LifecycleRules
418
        :param s3_object: S3Object
419
        :param object_tags: the object tags
420
        :return: the Expiration header if there's a rule matching
421
        """
422
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
423
            return cached_exp
1✔
424

425
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
426
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
427
        ):
428
            expiration_header = serialize_expiration_header(
1✔
429
                lifecycle_rule["ID"],
430
                lifecycle_rule["Expiration"],
431
                s3_object.last_modified,
432
            )
433
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
434
            return expiration_header
1✔
435

436
    def _get_cross_account_bucket(
1✔
437
        self,
438
        context: RequestContext,
439
        bucket_name: BucketName,
440
        *,
441
        expected_bucket_owner: AccountId = None,
442
    ) -> tuple[S3Store, S3Bucket]:
443
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
444
            raise InvalidBucketOwnerAWSAccountID(
1✔
445
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
446
            )
447

448
        store = self.get_store(context.account_id, context.region)
1✔
449
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
450
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
451
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
452

453
            store = self.get_store(account_id, context.region)
1✔
454
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
455
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
456

457
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
458
            raise AccessDenied("Access Denied")
1✔
459

460
        return store, s3_bucket
1✔
461

462
    @staticmethod
1✔
463
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
464
        # Use default account id for external access? would need an anonymous one
465
        return s3_stores[account_id][region_name]
1✔
466

467
    @handler("CreateBucket", expand=False)
1✔
468
    def create_bucket(
1✔
469
        self,
470
        context: RequestContext,
471
        request: CreateBucketRequest,
472
    ) -> CreateBucketOutput:
473
        bucket_name = request["Bucket"]
1✔
474

475
        if not is_bucket_name_valid(bucket_name):
1✔
476
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
477

478
        # the XML parser returns an empty dict if the body contains the following:
479
        # <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />
480
        # but it also returns an empty dict if the body is fully empty. We need to differentiate the 2 cases by checking
481
        # if the body is empty or not
482
        if context.request.data and (
1✔
483
            (create_bucket_configuration := request.get("CreateBucketConfiguration")) is not None
484
        ):
485
            if not (bucket_region := create_bucket_configuration.get("LocationConstraint")):
1✔
486
                raise MalformedXML()
1✔
487

488
            if context.region == AWS_REGION_US_EAST_1:
1✔
489
                if bucket_region == "us-east-1":
1✔
490
                    raise InvalidLocationConstraint(
1✔
491
                        "The specified location-constraint is not valid",
492
                        LocationConstraint=bucket_region,
493
                    )
494
            elif context.region != bucket_region:
1✔
495
                raise CommonServiceException(
1✔
496
                    code="IllegalLocationConstraintException",
497
                    message=f"The {bucket_region} location constraint is incompatible for the region specific endpoint this request was sent to.",
498
                )
499
        else:
500
            bucket_region = AWS_REGION_US_EAST_1
1✔
501
            if context.region != bucket_region:
1✔
502
                raise CommonServiceException(
1✔
503
                    code="IllegalLocationConstraintException",
504
                    message="The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
505
                )
506

507
        store = self.get_store(context.account_id, bucket_region)
1✔
508

509
        if bucket_name in store.global_bucket_map:
1✔
510
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
511
            if existing_bucket_owner != context.account_id:
1✔
512
                raise BucketAlreadyExists()
1✔
513

514
            # if the existing bucket has the same owner, the behaviour will depend on the region
515
            if bucket_region != "us-east-1":
1✔
516
                raise BucketAlreadyOwnedByYou(
1✔
517
                    "Your previous request to create the named bucket succeeded and you already own it.",
518
                    BucketName=bucket_name,
519
                )
520
            else:
521
                # CreateBucket is idempotent in us-east-1
522
                return CreateBucketOutput(Location=f"/{bucket_name}")
1✔
523

524
        if (
1✔
525
            object_ownership := request.get("ObjectOwnership")
526
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
527
            raise InvalidArgument(
1✔
528
                f"Invalid x-amz-object-ownership header: {object_ownership}",
529
                ArgumentName="x-amz-object-ownership",
530
            )
531
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
532
        owner = get_owner_for_account_id(context.account_id)
1✔
533
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
534
        s3_bucket = S3Bucket(
1✔
535
            name=bucket_name,
536
            account_id=context.account_id,
537
            bucket_region=bucket_region,
538
            owner=owner,
539
            acl=acl,
540
            object_ownership=request.get("ObjectOwnership"),
541
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
542
        )
543

544
        store.buckets[bucket_name] = s3_bucket
1✔
545
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
546
        self._cors_handler.invalidate_cache()
1✔
547
        self._storage_backend.create_bucket(bucket_name)
1✔
548

549
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
550
        location = (
1✔
551
            f"/{bucket_name}"
552
            if bucket_region == "us-east-1"
553
            else get_full_default_bucket_location(bucket_name)
554
        )
555
        response = CreateBucketOutput(Location=location)
1✔
556
        return response
1✔
557

558
    def delete_bucket(
1✔
559
        self,
560
        context: RequestContext,
561
        bucket: BucketName,
562
        expected_bucket_owner: AccountId = None,
563
        **kwargs,
564
    ) -> None:
565
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
566

567
        # the bucket still contains objects
568
        if not s3_bucket.objects.is_empty():
1✔
569
            message = "The bucket you tried to delete is not empty"
1✔
570
            if s3_bucket.versioning_status:
1✔
571
                message += ". You must delete all versions in the bucket."
1✔
572
            raise BucketNotEmpty(
1✔
573
                message,
574
                BucketName=bucket,
575
            )
576

577
        store.buckets.pop(bucket)
1✔
578
        store.global_bucket_map.pop(bucket)
1✔
579
        self._cors_handler.invalidate_cache()
1✔
580
        self._expiration_cache.pop(bucket, None)
1✔
581
        # clean up the storage backend
582
        self._storage_backend.delete_bucket(bucket)
1✔
583

584
    def list_buckets(
1✔
585
        self,
586
        context: RequestContext,
587
        max_buckets: MaxBuckets = None,
588
        continuation_token: Token = None,
589
        prefix: Prefix = None,
590
        bucket_region: BucketRegion = None,
591
        **kwargs,
592
    ) -> ListBucketsOutput:
593
        owner = get_owner_for_account_id(context.account_id)
1✔
594
        store = self.get_store(context.account_id, context.region)
1✔
595

596
        decoded_continuation_token = (
1✔
597
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
598
            if continuation_token
599
            else None
600
        )
601

602
        count = 0
1✔
603
        buckets: list[Bucket] = []
1✔
604
        next_continuation_token = None
1✔
605

606
        # Comparing strings with case sensitivity since AWS is case-sensitive
607
        for bucket in sorted(store.buckets.values(), key=lambda r: r.name):
1✔
608
            if continuation_token and bucket.name < decoded_continuation_token:
1✔
609
                continue
1✔
610

611
            if prefix and not bucket.name.startswith(prefix):
1✔
612
                continue
1✔
613

614
            if bucket_region and not bucket.bucket_region == bucket_region:
1✔
615
                continue
1✔
616

617
            if max_buckets and count >= max_buckets:
1✔
618
                next_continuation_token = to_str(base64.urlsafe_b64encode(bucket.name.encode()))
1✔
619
                break
1✔
620

621
            output_bucket = Bucket(
1✔
622
                Name=bucket.name,
623
                CreationDate=bucket.creation_date,
624
                BucketRegion=bucket.bucket_region,
625
            )
626
            buckets.append(output_bucket)
1✔
627
            count += 1
1✔
628

629
        return ListBucketsOutput(
1✔
630
            Owner=owner, Buckets=buckets, Prefix=prefix, ContinuationToken=next_continuation_token
631
        )
632

633
    def head_bucket(
1✔
634
        self,
635
        context: RequestContext,
636
        bucket: BucketName,
637
        expected_bucket_owner: AccountId = None,
638
        **kwargs,
639
    ) -> HeadBucketOutput:
640
        store = self.get_store(context.account_id, context.region)
1✔
641
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
642
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
643
                # just to return the 404 error message
644
                raise NoSuchBucket()
1✔
645

646
            store = self.get_store(account_id, context.region)
×
647
            if not (s3_bucket := store.buckets.get(bucket)):
×
648
                # just to return the 404 error message
649
                raise NoSuchBucket()
×
650

651
        # TODO: this call is also used to check if the user has access/authorization for the bucket
652
        #  it can return 403
653
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)
1✔
654

655
    def get_bucket_location(
1✔
656
        self,
657
        context: RequestContext,
658
        bucket: BucketName,
659
        expected_bucket_owner: AccountId = None,
660
        **kwargs,
661
    ) -> GetBucketLocationOutput:
662
        """
663
        When implementing the ASF provider, this operation is implemented because:
664
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
665
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
666
        - We circumvent the root level element here by patching the spec such that this operation returns a
667
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
668
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
669
          the payload, which is why we need to manually do this here by manipulating the string.
670
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
671
        """
672
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
673

674
        location_constraint = (
1✔
675
            '<?xml version="1.0" encoding="UTF-8"?>\n'
676
            '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">{{location}}</LocationConstraint>'
677
        )
678

679
        location = s3_bucket.bucket_region if s3_bucket.bucket_region != "us-east-1" else ""
1✔
680
        location_constraint = location_constraint.replace("{{location}}", location)
1✔
681

682
        response = GetBucketLocationOutput(LocationConstraint=location_constraint)
1✔
683
        return response
1✔
684

685
    @handler("PutObject", expand=False)
1✔
686
    def put_object(
1✔
687
        self,
688
        context: RequestContext,
689
        request: PutObjectRequest,
690
    ) -> PutObjectOutput:
691
        # TODO: validate order of validation
692
        # TODO: still need to handle following parameters
693
        #  request_payer: RequestPayer = None,
694
        bucket_name = request["Bucket"]
1✔
695
        key = request["Key"]
1✔
696
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
697

698
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
699
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
700
        ):
701
            raise InvalidStorageClass(
1✔
702
                "The storage class you specified is not valid", StorageClassRequested=storage_class
703
            )
704

705
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
706
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
707

708
        validate_object_key(key)
1✔
709

710
        if_match = request.get("IfMatch")
1✔
711
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
712
            raise NotImplementedException(
713
                "A header you provided implies functionality that is not implemented",
714
                Header="If-Match,If-None-Match",
715
                additionalMessage="Multiple conditional request headers present in the request",
716
            )
717

718
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
719
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
720
            raise NotImplementedException(
721
                "A header you provided implies functionality that is not implemented",
722
                Header=header_name,
723
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
724
            )
725

726
        system_metadata = get_system_metadata_from_request(request)
1✔
727
        if not system_metadata.get("ContentType"):
1✔
728
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
729

730
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
731

732
        etag_content_md5 = ""
1✔
733
        if content_md5 := request.get("ContentMD5"):
1✔
734
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
735
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
736
            if not etag_content_md5:
1✔
737
                raise InvalidDigest(
1✔
738
                    "The Content-MD5 you specified was invalid.",
739
                    Content_MD5=content_md5,
740
                )
741

742
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
743
        checksum_value = (
1✔
744
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
745
        )
746

747
        # TODO: we're not encrypting the object with the provided key for now
748
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
749
        validate_sse_c(
1✔
750
            algorithm=request.get("SSECustomerAlgorithm"),
751
            encryption_key=request.get("SSECustomerKey"),
752
            encryption_key_md5=sse_c_key_md5,
753
            server_side_encryption=request.get("ServerSideEncryption"),
754
        )
755

756
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
757
            request,
758
            s3_bucket,
759
            store,
760
        )
761

762
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
763

764
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
765

766
        if tagging := request.get("Tagging"):
1✔
767
            tagging = parse_tagging_header(tagging)
1✔
768

769
        s3_object = S3Object(
1✔
770
            key=key,
771
            version_id=version_id,
772
            storage_class=storage_class,
773
            expires=request.get("Expires"),
774
            user_metadata=request.get("Metadata"),
775
            system_metadata=system_metadata,
776
            checksum_algorithm=checksum_algorithm,
777
            checksum_value=checksum_value,
778
            encryption=encryption_parameters.encryption,
779
            kms_key_id=encryption_parameters.kms_key_id,
780
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
781
            sse_key_hash=sse_c_key_md5,
782
            lock_mode=lock_parameters.lock_mode,
783
            lock_legal_status=lock_parameters.lock_legal_status,
784
            lock_until=lock_parameters.lock_until,
785
            website_redirect_location=request.get("WebsiteRedirectLocation"),
786
            acl=acl,
787
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
788
        )
789

790
        body = request.get("Body")
1✔
791
        # check if chunked request
792
        headers = context.request.headers
1✔
793
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
794
            "STREAMING-"
795
        ) or "aws-chunked" in headers.get("content-encoding", "")
796
        if is_aws_chunked:
1✔
797
            checksum_algorithm = (
1✔
798
                checksum_algorithm
799
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
800
            )
801
            if checksum_algorithm:
1✔
802
                s3_object.checksum_algorithm = checksum_algorithm
1✔
803

804
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
805
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
806

807
            # S3 removes the `aws-chunked` value from ContentEncoding
808
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
809
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
810
                if encodings:
1✔
811
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
812

813
        with self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object:
1✔
814
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
815
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
816
            # opening the lock and other requests can check this condition
817
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
818
                raise PreconditionFailed(
1✔
819
                    "At least one of the pre-conditions you specified did not hold",
820
                    Condition="If-None-Match",
821
                )
822

823
            elif if_match:
1✔
824
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
825

826
            s3_stored_object.write(body)
1✔
827

828
            if s3_object.checksum_algorithm:
1✔
829
                if not s3_object.checksum_value:
1✔
830
                    s3_object.checksum_value = s3_stored_object.checksum
1✔
831
                elif not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
832
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
833
                    raise InvalidRequest(
1✔
834
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
835
                    )
836
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
837
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
838
                    raise BadDigest(
1✔
839
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
840
                    )
841

842
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
843
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
844
            if content_md5:
1✔
845
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
846
                if calculated_md5 != content_md5:
1✔
847
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
848
                    raise BadDigest(
1✔
849
                        "The Content-MD5 you specified did not match what we received.",
850
                        ExpectedDigest=etag_content_md5,
851
                        CalculatedDigest=calculated_md5,
852
                    )
853

854
            s3_bucket.objects.set(key, s3_object)
1✔
855

856
        # in case we are overriding an object, delete the tags entry
857
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
858
        store.TAGS.tags.pop(key_id, None)
1✔
859
        if tagging:
1✔
860
            store.TAGS.tags[key_id] = tagging
1✔
861

862
        # RequestCharged: Optional[RequestCharged]  # TODO
863
        response = PutObjectOutput(
1✔
864
            ETag=s3_object.quoted_etag,
865
        )
866
        if s3_bucket.versioning_status == "Enabled":
1✔
867
            response["VersionId"] = s3_object.version_id
1✔
868

869
        if s3_object.checksum_algorithm:
1✔
870
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
871
            response["ChecksumType"] = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
872

873
        if s3_bucket.lifecycle_rules:
1✔
874
            if expiration_header := self._get_expiration_header(
1✔
875
                s3_bucket.lifecycle_rules,
876
                bucket_name,
877
                s3_object,
878
                store.TAGS.tags.get(key_id, {}),
879
            ):
880
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
881
                #  apply them everytime we get/head an object
882
                response["Expiration"] = expiration_header
1✔
883

884
        add_encryption_to_response(response, s3_object=s3_object)
1✔
885
        if sse_c_key_md5:
1✔
886
            response["SSECustomerAlgorithm"] = "AES256"
1✔
887
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
888

889
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
890

891
        return response
1✔
892

893
    @handler("GetObject", expand=False)
1✔
894
    def get_object(
1✔
895
        self,
896
        context: RequestContext,
897
        request: GetObjectRequest,
898
    ) -> GetObjectOutput:
899
        # TODO: missing handling parameters:
900
        #  request_payer: RequestPayer = None,
901
        #  expected_bucket_owner: AccountId = None,
902

903
        bucket_name = request["Bucket"]
1✔
904
        object_key = request["Key"]
1✔
905
        version_id = request.get("VersionId")
1✔
906
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
907

908
        s3_object = s3_bucket.get_object(
1✔
909
            key=object_key,
910
            version_id=version_id,
911
            http_method="GET",
912
        )
913

914
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
915
            raise InvalidObjectState(
1✔
916
                "The operation is not valid for the object's storage class",
917
                StorageClass=s3_object.storage_class,
918
            )
919

920
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
921
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
922

923
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
924
        # we're using getattr access because when restoring, the field might not exist
925
        # TODO: cleanup at next major release
926
        if sse_key_hash := getattr(s3_object, "sse_key_hash", None):
1✔
927
            if sse_key_hash and not sse_c_key_md5:
1✔
928
                raise InvalidRequest(
1✔
929
                    "The object was stored using a form of Server Side Encryption. "
930
                    "The correct parameters must be provided to retrieve the object."
931
                )
932
            elif sse_key_hash != sse_c_key_md5:
1✔
933
                raise AccessDenied(
1✔
934
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
935
                )
936

937
        validate_sse_c(
1✔
938
            algorithm=request.get("SSECustomerAlgorithm"),
939
            encryption_key=request.get("SSECustomerKey"),
940
            encryption_key_md5=sse_c_key_md5,
941
        )
942

943
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
944

945
        range_header = request.get("Range")
1✔
946
        part_number = request.get("PartNumber")
1✔
947
        if range_header and part_number:
1✔
948
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
949
        range_data = None
1✔
950
        if range_header:
1✔
951
            range_data = parse_range_header(range_header, s3_object.size)
1✔
952
        elif part_number:
1✔
953
            range_data = get_part_range(s3_object, part_number)
1✔
954

955
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
956
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
957
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
958
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
959
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
960

961
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
962
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
963
        # the object again
964
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
965
            s3_object = s3_bucket.get_object(
1✔
966
                key=object_key,
967
                version_id=version_id,
968
                http_method="GET",
969
            )
970

971
        response = GetObjectOutput(
1✔
972
            AcceptRanges="bytes",
973
            **s3_object.get_system_metadata_fields(),
974
        )
975
        if s3_object.user_metadata:
1✔
976
            response["Metadata"] = s3_object.user_metadata
1✔
977

978
        if s3_object.parts and request.get("PartNumber"):
1✔
979
            response["PartsCount"] = len(s3_object.parts)
1✔
980

981
        if s3_object.version_id:
1✔
982
            response["VersionId"] = s3_object.version_id
1✔
983

984
        if s3_object.website_redirect_location:
1✔
985
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
986

987
        if s3_object.restore:
1✔
988
            response["Restore"] = s3_object.restore
×
989

990
        checksum_value = None
1✔
991
        checksum_type = None
1✔
992
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
993
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
994
                checksum_value = s3_object.checksum_value
1✔
995
                checksum_type = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
996

997
        if range_data:
1✔
998
            s3_stored_object.seek(range_data.begin)
1✔
999
            response["Body"] = LimitedIterableStream(
1✔
1000
                s3_stored_object, max_length=range_data.content_length
1001
            )
1002
            response["ContentRange"] = range_data.content_range
1✔
1003
            response["ContentLength"] = range_data.content_length
1✔
1004
            response["StatusCode"] = 206
1✔
1005
            if checksum_value:
1✔
1006
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1007
                    part_data = s3_object.parts[part_number]
1✔
1008
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1009
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1010
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1011

1012
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1013
                # only had one part
1014
                elif range_data.content_length == s3_object.size:
1✔
1015
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1016
                    response["ChecksumType"] = checksum_type
1✔
1017
        else:
1018
            response["Body"] = s3_stored_object
1✔
1019
            if checksum_value:
1✔
1020
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1021
                response["ChecksumType"] = checksum_type
1✔
1022

1023
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1024

1025
        if object_tags := store.TAGS.tags.get(
1✔
1026
            get_unique_key_id(bucket_name, object_key, version_id)
1027
        ):
1028
            response["TagCount"] = len(object_tags)
1✔
1029

1030
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
1031
            if expiration_header := self._get_expiration_header(
1✔
1032
                s3_bucket.lifecycle_rules,
1033
                bucket_name,
1034
                s3_object,
1035
                object_tags,
1036
            ):
1037
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1038
                #  apply them everytime we get/head an object
1039
                response["Expiration"] = expiration_header
1✔
1040

1041
        # TODO: missing returned fields
1042
        #     RequestCharged: Optional[RequestCharged]
1043
        #     ReplicationStatus: Optional[ReplicationStatus]
1044

1045
        if s3_object.lock_mode:
1✔
1046
            response["ObjectLockMode"] = s3_object.lock_mode
×
1047
            if s3_object.lock_until:
×
1048
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1049
        if s3_object.lock_legal_status:
1✔
1050
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1051

1052
        if sse_c_key_md5:
1✔
1053
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1054
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1055

1056
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1057
            if request_param_value := request.get(request_param):
1✔
1058
                response[response_param] = request_param_value
1✔
1059

1060
        return response
1✔
1061

1062
    @handler("HeadObject", expand=False)
1✔
1063
    def head_object(
1✔
1064
        self,
1065
        context: RequestContext,
1066
        request: HeadObjectRequest,
1067
    ) -> HeadObjectOutput:
1068
        bucket_name = request["Bucket"]
1✔
1069
        object_key = request["Key"]
1✔
1070
        version_id = request.get("VersionId")
1✔
1071
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1072

1073
        s3_object = s3_bucket.get_object(
1✔
1074
            key=object_key,
1075
            version_id=version_id,
1076
            http_method="HEAD",
1077
        )
1078

1079
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1080

1081
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1082
        if s3_object.sse_key_hash:
1✔
1083
            if not sse_c_key_md5:
1✔
1084
                raise InvalidRequest(
×
1085
                    "The object was stored using a form of Server Side Encryption. "
1086
                    "The correct parameters must be provided to retrieve the object."
1087
                )
1088
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1089
                raise AccessDenied(
1✔
1090
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1091
                )
1092

1093
        validate_sse_c(
1✔
1094
            algorithm=request.get("SSECustomerAlgorithm"),
1095
            encryption_key=request.get("SSECustomerKey"),
1096
            encryption_key_md5=sse_c_key_md5,
1097
        )
1098

1099
        response = HeadObjectOutput(
1✔
1100
            AcceptRanges="bytes",
1101
            **s3_object.get_system_metadata_fields(),
1102
        )
1103
        if s3_object.user_metadata:
1✔
1104
            response["Metadata"] = s3_object.user_metadata
1✔
1105

1106
        checksum_value = None
1✔
1107
        checksum_type = None
1✔
1108
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1109
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1110
                checksum_value = s3_object.checksum_value
1✔
1111
                checksum_type = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
1112

1113
        if s3_object.parts and request.get("PartNumber"):
1✔
1114
            response["PartsCount"] = len(s3_object.parts)
1✔
1115

1116
        if s3_object.version_id:
1✔
1117
            response["VersionId"] = s3_object.version_id
1✔
1118

1119
        if s3_object.website_redirect_location:
1✔
1120
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1121

1122
        if s3_object.restore:
1✔
1123
            response["Restore"] = s3_object.restore
1✔
1124

1125
        range_header = request.get("Range")
1✔
1126
        part_number = request.get("PartNumber")
1✔
1127
        if range_header and part_number:
1✔
1128
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1129
        range_data = None
1✔
1130
        if range_header:
1✔
1131
            range_data = parse_range_header(range_header, s3_object.size)
×
1132
        elif part_number:
1✔
1133
            range_data = get_part_range(s3_object, part_number)
1✔
1134

1135
        if range_data:
1✔
1136
            response["ContentLength"] = range_data.content_length
1✔
1137
            response["ContentRange"] = range_data.content_range
1✔
1138
            response["StatusCode"] = 206
1✔
1139
            if checksum_value:
1✔
1140
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1141
                    part_data = s3_object.parts[part_number]
1✔
1142
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1143
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1144
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1145

1146
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1147
                # only had one part
1148
                elif range_data.content_length == s3_object.size:
1✔
1149
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1150
                    response["ChecksumType"] = checksum_type
1✔
1151
        elif checksum_value:
1✔
1152
            response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1153
            response["ChecksumType"] = checksum_type
1✔
1154

1155
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1156

1157
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1158
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1159
            object_tags = store.TAGS.tags.get(
1✔
1160
                get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1161
            )
1162
            if expiration_header := self._get_expiration_header(
1✔
1163
                s3_bucket.lifecycle_rules,
1164
                bucket_name,
1165
                s3_object,
1166
                object_tags,
1167
            ):
1168
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1169
                #  apply them everytime we get/head an object
1170
                response["Expiration"] = expiration_header
1✔
1171

1172
        if s3_object.lock_mode:
1✔
1173
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1174
            if s3_object.lock_until:
1✔
1175
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1176
        if s3_object.lock_legal_status:
1✔
1177
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1178

1179
        if sse_c_key_md5:
1✔
1180
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1181
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1182

1183
        # TODO: missing return fields:
1184
        #  ArchiveStatus: Optional[ArchiveStatus]
1185
        #  RequestCharged: Optional[RequestCharged]
1186
        #  ReplicationStatus: Optional[ReplicationStatus]
1187

1188
        return response
1✔
1189

1190
    def delete_object(
1✔
1191
        self,
1192
        context: RequestContext,
1193
        bucket: BucketName,
1194
        key: ObjectKey,
1195
        mfa: MFA = None,
1196
        version_id: ObjectVersionId = None,
1197
        request_payer: RequestPayer = None,
1198
        bypass_governance_retention: BypassGovernanceRetention = None,
1199
        expected_bucket_owner: AccountId = None,
1200
        if_match: IfMatch = None,
1201
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1202
        if_match_size: IfMatchSize = None,
1203
        **kwargs,
1204
    ) -> DeleteObjectOutput:
1205
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1206

1207
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1208
            raise InvalidArgument(
1✔
1209
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1210
                ArgumentName="x-amz-bypass-governance-retention",
1211
            )
1212

1213
        # TODO: this is only supported for Directory Buckets
1214
        non_supported_precondition = None
1✔
1215
        if if_match:
1✔
1216
            non_supported_precondition = "If-Match"
1✔
1217
        if if_match_size:
1✔
1218
            non_supported_precondition = "x-amz-if-match-size"
1✔
1219
        if if_match_last_modified_time:
1✔
1220
            non_supported_precondition = "x-amz-if-match-last-modified-time"
1✔
1221
        if non_supported_precondition:
1✔
1222
            LOG.warning(
1✔
1223
                "DeleteObject Preconditions is only supported for Directory Buckets. "
1224
                "LocalStack does not support Directory Buckets yet."
1225
            )
1226
            raise NotImplementedException(
1227
                "A header you provided implies functionality that is not implemented",
1228
                Header=non_supported_precondition,
1229
            )
1230

1231
        if s3_bucket.versioning_status is None:
1✔
1232
            if version_id and version_id != "null":
1✔
1233
                raise InvalidArgument(
1✔
1234
                    "Invalid version id specified",
1235
                    ArgumentName="versionId",
1236
                    ArgumentValue=version_id,
1237
                )
1238

1239
            found_object = s3_bucket.objects.pop(key, None)
1✔
1240
            # TODO: RequestCharged
1241
            if found_object:
1✔
1242
                self._storage_backend.remove(bucket, found_object)
1✔
1243
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1244
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1245

1246
            return DeleteObjectOutput()
1✔
1247

1248
        if not version_id:
1✔
1249
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1250
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1251
            s3_bucket.objects.set(key, delete_marker)
1✔
1252
            s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1253
                context,
1254
                s3_bucket=s3_bucket,
1255
                s3_object=delete_marker,
1256
            )
1257
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1258
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1259

1260
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1261

1262
        if key not in s3_bucket.objects:
1✔
1263
            return DeleteObjectOutput()
×
1264

1265
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1266
            raise InvalidArgument(
1✔
1267
                "Invalid version id specified",
1268
                ArgumentName="versionId",
1269
                ArgumentValue=version_id,
1270
            )
1271

1272
        if s3_object.is_locked(bypass_governance_retention):
1✔
1273
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1274

1275
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1276
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1277

1278
        if isinstance(s3_object, S3DeleteMarker):
1✔
1279
            response["DeleteMarker"] = True
1✔
1280
        else:
1281
            self._storage_backend.remove(bucket, s3_object)
1✔
1282
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1283
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1284

1285
        return response
1✔
1286

1287
    def delete_objects(
1✔
1288
        self,
1289
        context: RequestContext,
1290
        bucket: BucketName,
1291
        delete: Delete,
1292
        mfa: MFA = None,
1293
        request_payer: RequestPayer = None,
1294
        bypass_governance_retention: BypassGovernanceRetention = None,
1295
        expected_bucket_owner: AccountId = None,
1296
        checksum_algorithm: ChecksumAlgorithm = None,
1297
        **kwargs,
1298
    ) -> DeleteObjectsOutput:
1299
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1300

1301
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1302
            raise InvalidArgument(
1✔
1303
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1304
                ArgumentName="x-amz-bypass-governance-retention",
1305
            )
1306

1307
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1308
        if not objects:
1✔
1309
            raise MalformedXML()
×
1310

1311
        # TODO: max 1000 delete at once? test against AWS?
1312

1313
        quiet = delete.get("Quiet", False)
1✔
1314
        deleted = []
1✔
1315
        errors = []
1✔
1316

1317
        to_remove = []
1✔
1318
        for to_delete_object in objects:
1✔
1319
            object_key = to_delete_object.get("Key")
1✔
1320
            version_id = to_delete_object.get("VersionId")
1✔
1321
            if s3_bucket.versioning_status is None:
1✔
1322
                if version_id and version_id != "null":
1✔
1323
                    errors.append(
1✔
1324
                        Error(
1325
                            Code="NoSuchVersion",
1326
                            Key=object_key,
1327
                            Message="The specified version does not exist.",
1328
                            VersionId=version_id,
1329
                        )
1330
                    )
1331
                    continue
1✔
1332

1333
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1334
                if found_object:
1✔
1335
                    to_remove.append(found_object)
1✔
1336
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1337
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1338
                # small hack to not create a fake object for nothing
1339
                elif s3_bucket.notification_configuration:
1✔
1340
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1341
                    # for a non-existing object being deleted
1342
                    self._notify(
1✔
1343
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1344
                    )
1345

1346
                if not quiet:
1✔
1347
                    deleted.append(DeletedObject(Key=object_key))
1✔
1348

1349
                continue
1✔
1350

1351
            if not version_id:
1✔
1352
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1353
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1354
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1355
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1356
                    context,
1357
                    s3_bucket=s3_bucket,
1358
                    s3_object=delete_marker,
1359
                )
1360
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1361
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1362

1363
                if not quiet:
1✔
1364
                    deleted.append(
1✔
1365
                        DeletedObject(
1366
                            DeleteMarker=True,
1367
                            DeleteMarkerVersionId=delete_marker_id,
1368
                            Key=object_key,
1369
                        )
1370
                    )
1371
                continue
1✔
1372

1373
            if not (
1✔
1374
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1375
            ):
1376
                errors.append(
1✔
1377
                    Error(
1378
                        Code="NoSuchVersion",
1379
                        Key=object_key,
1380
                        Message="The specified version does not exist.",
1381
                        VersionId=version_id,
1382
                    )
1383
                )
1384
                continue
1✔
1385

1386
            if found_object.is_locked(bypass_governance_retention):
1✔
1387
                errors.append(
1✔
1388
                    Error(
1389
                        Code="AccessDenied",
1390
                        Key=object_key,
1391
                        Message="Access Denied because object protected by object lock.",
1392
                        VersionId=version_id,
1393
                    )
1394
                )
1395
                continue
1✔
1396

1397
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1398
            if not quiet:
1✔
1399
                deleted_object = DeletedObject(
1✔
1400
                    Key=object_key,
1401
                    VersionId=version_id,
1402
                )
1403
                if isinstance(found_object, S3DeleteMarker):
1✔
1404
                    deleted_object["DeleteMarker"] = True
1✔
1405
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1406

1407
                deleted.append(deleted_object)
1✔
1408

1409
            if isinstance(found_object, S3Object):
1✔
1410
                to_remove.append(found_object)
1✔
1411

1412
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1413
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1414

1415
        # TODO: request charged
1416
        self._storage_backend.remove(bucket, to_remove)
1✔
1417
        response: DeleteObjectsOutput = {}
1✔
1418
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1419
        if errors:
1✔
1420
            response["Errors"] = errors
1✔
1421
        if not quiet:
1✔
1422
            response["Deleted"] = deleted
1✔
1423

1424
        return response
1✔
1425

1426
    @handler("CopyObject", expand=False)
1✔
1427
    def copy_object(
1✔
1428
        self,
1429
        context: RequestContext,
1430
        request: CopyObjectRequest,
1431
    ) -> CopyObjectOutput:
1432
        # request_payer: RequestPayer = None,  # TODO:
1433
        dest_bucket = request["Bucket"]
1✔
1434
        dest_key = request["Key"]
1✔
1435
        validate_object_key(dest_key)
1✔
1436
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1437

1438
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1439
            request.get("CopySource")
1440
        )
1441
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1442

1443
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1444
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1445

1446
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1447
        try:
1✔
1448
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
1449
        except MethodNotAllowed:
×
1450
            raise InvalidRequest(
×
1451
                "The source of a copy request may not specifically refer to a delete marker by version id."
1452
            )
1453

1454
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
1455
            raise InvalidObjectState(
×
1456
                "Operation is not valid for the source object's storage class",
1457
                StorageClass=src_s3_object.storage_class,
1458
            )
1459

1460
        if failed_condition := get_failed_precondition_copy_source(
1✔
1461
            request, src_s3_object.last_modified, src_s3_object.etag
1462
        ):
1463
            raise PreconditionFailed(
1✔
1464
                "At least one of the pre-conditions you specified did not hold",
1465
                Condition=failed_condition,
1466
            )
1467

1468
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1469
        if src_s3_object.sse_key_hash:
1✔
1470
            if not source_sse_c_key_md5:
1✔
1471
                raise InvalidRequest(
1✔
1472
                    "The object was stored using a form of Server Side Encryption. "
1473
                    "The correct parameters must be provided to retrieve the object."
1474
                )
1475
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
1476
                raise AccessDenied("Access Denied")
×
1477

1478
        validate_sse_c(
1✔
1479
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1480
            encryption_key=request.get("CopySourceSSECustomerKey"),
1481
            encryption_key_md5=source_sse_c_key_md5,
1482
        )
1483

1484
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1485
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1486
        # validate target SSE-C parameters
1487
        validate_sse_c(
1✔
1488
            algorithm=request.get("SSECustomerAlgorithm"),
1489
            encryption_key=request.get("SSECustomerKey"),
1490
            encryption_key_md5=target_sse_c_key_md5,
1491
            server_side_encryption=server_side_encryption,
1492
        )
1493

1494
        # TODO validate order of validation
1495
        storage_class = request.get("StorageClass")
1✔
1496
        metadata_directive = request.get("MetadataDirective")
1✔
1497
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1498
        # we need to check for identity of the object, to see if the default one has been changed
1499
        is_default_encryption = (
1✔
1500
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1501
            and src_s3_object.encryption == "AES256"
1502
        )
1503
        if (
1✔
1504
            src_bucket == dest_bucket
1505
            and src_key == dest_key
1506
            and not any(
1507
                (
1508
                    storage_class,
1509
                    server_side_encryption,
1510
                    target_sse_c_key_md5,
1511
                    metadata_directive == "REPLACE",
1512
                    website_redirect_location,
1513
                    dest_s3_bucket.encryption_rule
1514
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1515
                    src_s3_object.restore,
1516
                )
1517
            )
1518
        ):
1519
            raise InvalidRequest(
1✔
1520
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1521
                "object's metadata, storage class, website redirect location or encryption attributes."
1522
            )
1523

1524
        if tagging := request.get("Tagging"):
1✔
1525
            tagging = parse_tagging_header(tagging)
1✔
1526

1527
        if metadata_directive == "REPLACE":
1✔
1528
            user_metadata = request.get("Metadata")
1✔
1529
            system_metadata = get_system_metadata_from_request(request)
1✔
1530
            if not system_metadata.get("ContentType"):
1✔
1531
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1532
        else:
1533
            user_metadata = src_s3_object.user_metadata
1✔
1534
            system_metadata = src_s3_object.system_metadata
1✔
1535

1536
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1537

1538
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1539
            request,
1540
            dest_s3_bucket,
1541
            store,
1542
        )
1543
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1544
            request, dest_s3_bucket
1545
        )
1546

1547
        acl = get_access_control_policy_for_new_resource_request(
1✔
1548
            request, owner=dest_s3_bucket.owner
1549
        )
1550
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
1551

1552
        s3_object = S3Object(
1✔
1553
            key=dest_key,
1554
            size=src_s3_object.size,
1555
            version_id=dest_version_id,
1556
            storage_class=storage_class,
1557
            expires=request.get("Expires"),
1558
            user_metadata=user_metadata,
1559
            system_metadata=system_metadata,
1560
            checksum_algorithm=checksum_algorithm or src_s3_object.checksum_algorithm,
1561
            encryption=encryption_parameters.encryption,
1562
            kms_key_id=encryption_parameters.kms_key_id,
1563
            bucket_key_enabled=request.get(
1564
                "BucketKeyEnabled"
1565
            ),  # CopyObject does not inherit from the bucket here
1566
            sse_key_hash=target_sse_c_key_md5,
1567
            lock_mode=lock_parameters.lock_mode,
1568
            lock_legal_status=lock_parameters.lock_legal_status,
1569
            lock_until=lock_parameters.lock_until,
1570
            website_redirect_location=website_redirect_location,
1571
            expiration=None,  # TODO, from lifecycle
1572
            acl=acl,
1573
            owner=dest_s3_bucket.owner,
1574
        )
1575

1576
        with self._storage_backend.copy(
1✔
1577
            src_bucket=src_bucket,
1578
            src_object=src_s3_object,
1579
            dest_bucket=dest_bucket,
1580
            dest_object=s3_object,
1581
        ) as s3_stored_object:
1582
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1583
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1584

1585
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1586

1587
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1588

1589
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1590
            store.TAGS.tags[dest_key_id] = tagging or {}
1✔
1591
        else:
1592
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1593
            src_tags = store.TAGS.tags.get(src_key_id, {})
1✔
1594
            store.TAGS.tags[dest_key_id] = copy.copy(src_tags)
1✔
1595

1596
        copy_object_result = CopyObjectResult(
1✔
1597
            ETag=s3_object.quoted_etag,
1598
            LastModified=s3_object.last_modified,
1599
        )
1600
        if s3_object.checksum_algorithm:
1✔
1601
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1602
                s3_object.checksum_value
1603
            )
1604

1605
        response = CopyObjectOutput(
1✔
1606
            CopyObjectResult=copy_object_result,
1607
        )
1608

1609
        if s3_object.version_id:
1✔
1610
            response["VersionId"] = s3_object.version_id
1✔
1611

1612
        if s3_object.expiration:
1✔
1613
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1614

1615
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1616
        if target_sse_c_key_md5:
1✔
1617
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1618
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1619

1620
        if (
1✔
1621
            src_s3_bucket.versioning_status
1622
            and src_s3_object.version_id
1623
            and src_s3_object.version_id != "null"
1624
        ):
1625
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1626

1627
        # RequestCharged: Optional[RequestCharged] # TODO
1628
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1629

1630
        return response
1✔
1631

1632
    def list_objects(
1✔
1633
        self,
1634
        context: RequestContext,
1635
        bucket: BucketName,
1636
        delimiter: Delimiter = None,
1637
        encoding_type: EncodingType = None,
1638
        marker: Marker = None,
1639
        max_keys: MaxKeys = None,
1640
        prefix: Prefix = None,
1641
        request_payer: RequestPayer = None,
1642
        expected_bucket_owner: AccountId = None,
1643
        optional_object_attributes: OptionalObjectAttributesList = None,
1644
        **kwargs,
1645
    ) -> ListObjectsOutput:
1646
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1647

1648
        common_prefixes = set()
1✔
1649
        count = 0
1✔
1650
        is_truncated = False
1✔
1651
        next_key_marker = None
1✔
1652
        max_keys = max_keys or 1000
1✔
1653
        prefix = prefix or ""
1✔
1654
        delimiter = delimiter or ""
1✔
1655
        if encoding_type:
1✔
1656
            prefix = urlparse.quote(prefix)
1✔
1657
            delimiter = urlparse.quote(delimiter)
1✔
1658

1659
        s3_objects: list[Object] = []
1✔
1660

1661
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1662
        last_key = all_keys[-1] if all_keys else None
1✔
1663

1664
        # sort by key
1665
        for s3_object in all_keys:
1✔
1666
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1667
            # skip all keys that alphabetically come before key_marker
1668
            if marker:
1✔
1669
                if key <= marker:
1✔
1670
                    continue
1✔
1671

1672
            # Filter for keys that start with prefix
1673
            if prefix and not key.startswith(prefix):
1✔
1674
                continue
×
1675

1676
            # see ListObjectsV2 for the logic comments (shared logic here)
1677
            prefix_including_delimiter = None
1✔
1678
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1679
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1680
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1681

1682
                if prefix_including_delimiter in common_prefixes or (
1✔
1683
                    marker and marker.startswith(prefix_including_delimiter)
1684
                ):
1685
                    continue
1✔
1686

1687
            if prefix_including_delimiter:
1✔
1688
                common_prefixes.add(prefix_including_delimiter)
1✔
1689
            else:
1690
                # TODO: add RestoreStatus if present
1691
                object_data = Object(
1✔
1692
                    Key=key,
1693
                    ETag=s3_object.quoted_etag,
1694
                    Owner=s3_bucket.owner,  # TODO: verify reality
1695
                    Size=s3_object.size,
1696
                    LastModified=s3_object.last_modified,
1697
                    StorageClass=s3_object.storage_class,
1698
                )
1699

1700
                if s3_object.checksum_algorithm:
1✔
1701
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1702
                    object_data["ChecksumType"] = getattr(
1✔
1703
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1704
                    )
1705

1706
                s3_objects.append(object_data)
1✔
1707

1708
            # we just added a CommonPrefix or an Object, increase the counter
1709
            count += 1
1✔
1710
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1711
                is_truncated = True
1✔
1712
                if prefix_including_delimiter:
1✔
1713
                    next_key_marker = prefix_including_delimiter
1✔
1714
                elif s3_objects:
1✔
1715
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1716
                break
1✔
1717

1718
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1719

1720
        response = ListObjectsOutput(
1✔
1721
            IsTruncated=is_truncated,
1722
            Name=bucket,
1723
            MaxKeys=max_keys,
1724
            Prefix=prefix or "",
1725
            Marker=marker or "",
1726
        )
1727
        if s3_objects:
1✔
1728
            response["Contents"] = s3_objects
1✔
1729
        if encoding_type:
1✔
1730
            response["EncodingType"] = EncodingType.url
1✔
1731
        if delimiter:
1✔
1732
            response["Delimiter"] = delimiter
1✔
1733
        if common_prefixes:
1✔
1734
            response["CommonPrefixes"] = common_prefixes
1✔
1735
        if delimiter and next_key_marker:
1✔
1736
            response["NextMarker"] = next_key_marker
1✔
1737
        if s3_bucket.bucket_region != "us-east-1":
1✔
1738
            response["BucketRegion"] = s3_bucket.bucket_region
×
1739

1740
        # RequestCharged: Optional[RequestCharged]  # TODO
1741
        return response
1✔
1742

1743
    def list_objects_v2(
1✔
1744
        self,
1745
        context: RequestContext,
1746
        bucket: BucketName,
1747
        delimiter: Delimiter = None,
1748
        encoding_type: EncodingType = None,
1749
        max_keys: MaxKeys = None,
1750
        prefix: Prefix = None,
1751
        continuation_token: Token = None,
1752
        fetch_owner: FetchOwner = None,
1753
        start_after: StartAfter = None,
1754
        request_payer: RequestPayer = None,
1755
        expected_bucket_owner: AccountId = None,
1756
        optional_object_attributes: OptionalObjectAttributesList = None,
1757
        **kwargs,
1758
    ) -> ListObjectsV2Output:
1759
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1760

1761
        if continuation_token == "":
1✔
1762
            raise InvalidArgument(
1✔
1763
                "The continuation token provided is incorrect",
1764
                ArgumentName="continuation-token",
1765
            )
1766

1767
        common_prefixes = set()
1✔
1768
        count = 0
1✔
1769
        is_truncated = False
1✔
1770
        next_continuation_token = None
1✔
1771
        max_keys = max_keys or 1000
1✔
1772
        prefix = prefix or ""
1✔
1773
        delimiter = delimiter or ""
1✔
1774
        if encoding_type:
1✔
1775
            prefix = urlparse.quote(prefix)
1✔
1776
            delimiter = urlparse.quote(delimiter)
1✔
1777
        decoded_continuation_token = (
1✔
1778
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
1779
            if continuation_token
1780
            else None
1781
        )
1782

1783
        s3_objects: list[Object] = []
1✔
1784

1785
        # sort by key
1786
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1787
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1788

1789
            # skip all keys that alphabetically come before continuation_token
1790
            if continuation_token:
1✔
1791
                if key < decoded_continuation_token:
1✔
1792
                    continue
1✔
1793

1794
            elif start_after:
1✔
1795
                if key <= start_after:
1✔
1796
                    continue
1✔
1797

1798
            # Filter for keys that start with prefix
1799
            if prefix and not key.startswith(prefix):
1✔
1800
                continue
1✔
1801

1802
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1803
            prefix_including_delimiter = None
1✔
1804
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1805
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1806
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1807

1808
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1809
                # the entry without increasing the counter. We need to iterate over all of these entries before
1810
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1811
                if prefix_including_delimiter in common_prefixes:
1✔
1812
                    continue
1✔
1813

1814
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1815
            if count >= max_keys:
1✔
1816
                is_truncated = True
1✔
1817
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
1✔
1818
                break
1✔
1819

1820
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1821
            # else, it means it's a new Object, add it to the Contents
1822
            if prefix_including_delimiter:
1✔
1823
                common_prefixes.add(prefix_including_delimiter)
1✔
1824
            else:
1825
                # TODO: add RestoreStatus if present
1826
                object_data = Object(
1✔
1827
                    Key=key,
1828
                    ETag=s3_object.quoted_etag,
1829
                    Size=s3_object.size,
1830
                    LastModified=s3_object.last_modified,
1831
                    StorageClass=s3_object.storage_class,
1832
                )
1833

1834
                if fetch_owner:
1✔
1835
                    object_data["Owner"] = s3_bucket.owner
×
1836

1837
                if s3_object.checksum_algorithm:
1✔
1838
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1839
                    object_data["ChecksumType"] = getattr(
1✔
1840
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1841
                    )
1842

1843
                s3_objects.append(object_data)
1✔
1844

1845
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1846
            count += 1
1✔
1847

1848
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1849

1850
        response = ListObjectsV2Output(
1✔
1851
            IsTruncated=is_truncated,
1852
            Name=bucket,
1853
            MaxKeys=max_keys,
1854
            Prefix=prefix or "",
1855
            KeyCount=count,
1856
        )
1857
        if s3_objects:
1✔
1858
            response["Contents"] = s3_objects
1✔
1859
        if encoding_type:
1✔
1860
            response["EncodingType"] = EncodingType.url
1✔
1861
        if delimiter:
1✔
1862
            response["Delimiter"] = delimiter
1✔
1863
        if common_prefixes:
1✔
1864
            response["CommonPrefixes"] = common_prefixes
1✔
1865
        if next_continuation_token:
1✔
1866
            response["NextContinuationToken"] = next_continuation_token
1✔
1867

1868
        if continuation_token:
1✔
1869
            response["ContinuationToken"] = continuation_token
1✔
1870
        elif start_after:
1✔
1871
            response["StartAfter"] = start_after
1✔
1872

1873
        if s3_bucket.bucket_region != "us-east-1":
1✔
1874
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1875

1876
        # RequestCharged: Optional[RequestCharged]  # TODO
1877
        return response
1✔
1878

1879
    def list_object_versions(
1✔
1880
        self,
1881
        context: RequestContext,
1882
        bucket: BucketName,
1883
        delimiter: Delimiter = None,
1884
        encoding_type: EncodingType = None,
1885
        key_marker: KeyMarker = None,
1886
        max_keys: MaxKeys = None,
1887
        prefix: Prefix = None,
1888
        version_id_marker: VersionIdMarker = None,
1889
        expected_bucket_owner: AccountId = None,
1890
        request_payer: RequestPayer = None,
1891
        optional_object_attributes: OptionalObjectAttributesList = None,
1892
        **kwargs,
1893
    ) -> ListObjectVersionsOutput:
1894
        if version_id_marker and not key_marker:
1✔
1895
            raise InvalidArgument(
1✔
1896
                "A version-id marker cannot be specified without a key marker.",
1897
                ArgumentName="version-id-marker",
1898
                ArgumentValue=version_id_marker,
1899
            )
1900

1901
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1902
        common_prefixes = set()
1✔
1903
        count = 0
1✔
1904
        is_truncated = False
1✔
1905
        next_key_marker = None
1✔
1906
        next_version_id_marker = None
1✔
1907
        max_keys = max_keys or 1000
1✔
1908
        prefix = prefix or ""
1✔
1909
        delimiter = delimiter or ""
1✔
1910
        if encoding_type:
1✔
1911
            prefix = urlparse.quote(prefix)
1✔
1912
            delimiter = urlparse.quote(delimiter)
1✔
1913
        version_key_marker_found = False
1✔
1914

1915
        object_versions: list[ObjectVersion] = []
1✔
1916
        delete_markers: list[DeleteMarkerEntry] = []
1✔
1917

1918
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
1919
        # sort by key, and last-modified-date, to get the last version first
1920
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
1921
        last_version = all_versions[-1] if all_versions else None
1✔
1922

1923
        for version in all_versions:
1✔
1924
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
1925
            # skip all keys that alphabetically come before key_marker
1926
            if key_marker:
1✔
1927
                if key < key_marker:
1✔
1928
                    continue
1✔
1929
                elif key == key_marker:
1✔
1930
                    if not version_id_marker:
1✔
1931
                        continue
1✔
1932
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
1933
                    if version.version_id == version_id_marker:
1✔
1934
                        version_key_marker_found = True
1✔
1935
                        continue
1✔
1936

1937
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
1938
                    # as soon as the next version id is older than the version id marker (meaning this version was
1939
                    # next after the now-deleted version)
1940
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
1941
                        version_key_marker_found = True
1✔
1942

1943
                    elif not version_key_marker_found:
1✔
1944
                        # as long as we have not passed the version_key_marker, skip the versions
1945
                        continue
1✔
1946

1947
            # Filter for keys that start with prefix
1948
            if prefix and not key.startswith(prefix):
1✔
1949
                continue
1✔
1950

1951
            # see ListObjectsV2 for the logic comments (shared logic here)
1952
            prefix_including_delimiter = None
1✔
1953
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1954
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1955
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1956

1957
                if prefix_including_delimiter in common_prefixes or (
1✔
1958
                    key_marker and key_marker.startswith(prefix_including_delimiter)
1959
                ):
1960
                    continue
1✔
1961

1962
            if prefix_including_delimiter:
1✔
1963
                common_prefixes.add(prefix_including_delimiter)
1✔
1964

1965
            elif isinstance(version, S3DeleteMarker):
1✔
1966
                delete_marker = DeleteMarkerEntry(
1✔
1967
                    Key=key,
1968
                    Owner=s3_bucket.owner,
1969
                    VersionId=version.version_id,
1970
                    IsLatest=version.is_current,
1971
                    LastModified=version.last_modified,
1972
                )
1973
                delete_markers.append(delete_marker)
1✔
1974
            else:
1975
                # TODO: add RestoreStatus if present
1976
                object_version = ObjectVersion(
1✔
1977
                    Key=key,
1978
                    ETag=version.quoted_etag,
1979
                    Owner=s3_bucket.owner,  # TODO: verify reality
1980
                    Size=version.size,
1981
                    VersionId=version.version_id or "null",
1982
                    LastModified=version.last_modified,
1983
                    IsLatest=version.is_current,
1984
                    # TODO: verify this, are other class possible?
1985
                    # StorageClass=version.storage_class,
1986
                    StorageClass=ObjectVersionStorageClass.STANDARD,
1987
                )
1988

1989
                if version.checksum_algorithm:
1✔
1990
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
1991
                    object_version["ChecksumType"] = getattr(
1✔
1992
                        version, "checksum_type", ChecksumType.FULL_OBJECT
1993
                    )
1994

1995
                object_versions.append(object_version)
1✔
1996

1997
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
1998
            count += 1
1✔
1999
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
2000
                is_truncated = True
1✔
2001
                if prefix_including_delimiter:
1✔
2002
                    next_key_marker = prefix_including_delimiter
1✔
2003
                else:
2004
                    next_key_marker = version.key
1✔
2005
                    next_version_id_marker = version.version_id
1✔
2006
                break
1✔
2007

2008
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2009

2010
        response = ListObjectVersionsOutput(
1✔
2011
            IsTruncated=is_truncated,
2012
            Name=bucket,
2013
            MaxKeys=max_keys,
2014
            Prefix=prefix,
2015
            KeyMarker=key_marker or "",
2016
            VersionIdMarker=version_id_marker or "",
2017
        )
2018
        if object_versions:
1✔
2019
            response["Versions"] = object_versions
1✔
2020
        if encoding_type:
1✔
2021
            response["EncodingType"] = EncodingType.url
1✔
2022
        if delete_markers:
1✔
2023
            response["DeleteMarkers"] = delete_markers
1✔
2024
        if delimiter:
1✔
2025
            response["Delimiter"] = delimiter
1✔
2026
        if common_prefixes:
1✔
2027
            response["CommonPrefixes"] = common_prefixes
1✔
2028
        if next_key_marker:
1✔
2029
            response["NextKeyMarker"] = next_key_marker
1✔
2030
        if next_version_id_marker:
1✔
2031
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
2032

2033
        # RequestCharged: Optional[RequestCharged]  # TODO
2034
        return response
1✔
2035

2036
    @handler("GetObjectAttributes", expand=False)
1✔
2037
    def get_object_attributes(
1✔
2038
        self,
2039
        context: RequestContext,
2040
        request: GetObjectAttributesRequest,
2041
    ) -> GetObjectAttributesOutput:
2042
        bucket_name = request["Bucket"]
1✔
2043
        object_key = request["Key"]
1✔
2044
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2045

2046
        s3_object = s3_bucket.get_object(
1✔
2047
            key=object_key,
2048
            version_id=request.get("VersionId"),
2049
            http_method="GET",
2050
        )
2051

2052
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2053
        if s3_object.sse_key_hash:
1✔
2054
            if not sse_c_key_md5:
1✔
2055
                raise InvalidRequest(
×
2056
                    "The object was stored using a form of Server Side Encryption. "
2057
                    "The correct parameters must be provided to retrieve the object."
2058
                )
2059
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
2060
                raise AccessDenied("Access Denied")
×
2061

2062
        validate_sse_c(
1✔
2063
            algorithm=request.get("SSECustomerAlgorithm"),
2064
            encryption_key=request.get("SSECustomerKey"),
2065
            encryption_key_md5=sse_c_key_md5,
2066
        )
2067

2068
        object_attrs = request.get("ObjectAttributes", [])
1✔
2069
        response = GetObjectAttributesOutput()
1✔
2070
        object_checksum_type = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
2071
        if "ETag" in object_attrs:
1✔
2072
            response["ETag"] = s3_object.etag
1✔
2073
        if "StorageClass" in object_attrs:
1✔
2074
            response["StorageClass"] = s3_object.storage_class
1✔
2075
        if "ObjectSize" in object_attrs:
1✔
2076
            response["ObjectSize"] = s3_object.size
1✔
2077
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
2078
            if s3_object.parts:
1✔
2079
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2080
            else:
2081
                checksum_value = s3_object.checksum_value
1✔
2082
            response["Checksum"] = {
1✔
2083
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2084
                "ChecksumType": object_checksum_type,
2085
            }
2086

2087
        response["LastModified"] = s3_object.last_modified
1✔
2088

2089
        if s3_bucket.versioning_status:
1✔
2090
            response["VersionId"] = s3_object.version_id
1✔
2091

2092
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2093
            if object_checksum_type == ChecksumType.FULL_OBJECT:
1✔
2094
                response["ObjectParts"] = GetObjectAttributesParts(
1✔
2095
                    TotalPartsCount=len(s3_object.parts)
2096
                )
2097
            else:
2098
                # this is basically a simplified `ListParts` call on the object, only returned when the checksum type is
2099
                # COMPOSITE
2100
                count = 0
1✔
2101
                is_truncated = False
1✔
2102
                part_number_marker = request.get("PartNumberMarker") or 0
1✔
2103
                max_parts = request.get("MaxParts") or 1000
1✔
2104

2105
                parts = []
1✔
2106
                all_parts = sorted(s3_object.parts.items())
1✔
2107
                last_part_number, last_part = all_parts[-1]
1✔
2108

2109
                # TODO: remove this backward compatibility hack needed for state created with <= 4.5
2110
                #  the parts would only be a tuple and would not store the proper state for 4.5 and earlier, so we need
2111
                #  to return early
2112
                if isinstance(last_part, tuple):
1✔
2113
                    response["ObjectParts"] = GetObjectAttributesParts(
×
2114
                        TotalPartsCount=len(s3_object.parts)
2115
                    )
2116
                    return response
×
2117

2118
                for part_number, part in all_parts:
1✔
2119
                    if part_number <= part_number_marker:
1✔
2120
                        continue
1✔
2121
                    part_item = select_from_typed_dict(ObjectPart, part)
1✔
2122

2123
                    parts.append(part_item)
1✔
2124
                    count += 1
1✔
2125

2126
                    if count >= max_parts and part["PartNumber"] != last_part_number:
1✔
2127
                        is_truncated = True
1✔
2128
                        break
1✔
2129

2130
                object_parts = GetObjectAttributesParts(
1✔
2131
                    TotalPartsCount=len(s3_object.parts),
2132
                    IsTruncated=is_truncated,
2133
                    MaxParts=max_parts,
2134
                    PartNumberMarker=part_number_marker,
2135
                    NextPartNumberMarker=0,
2136
                )
2137
                if parts:
1✔
2138
                    object_parts["Parts"] = parts
1✔
2139
                    object_parts["NextPartNumberMarker"] = parts[-1]["PartNumber"]
1✔
2140

2141
                response["ObjectParts"] = object_parts
1✔
2142

2143
        return response
1✔
2144

2145
    def restore_object(
1✔
2146
        self,
2147
        context: RequestContext,
2148
        bucket: BucketName,
2149
        key: ObjectKey,
2150
        version_id: ObjectVersionId = None,
2151
        restore_request: RestoreRequest = None,
2152
        request_payer: RequestPayer = None,
2153
        checksum_algorithm: ChecksumAlgorithm = None,
2154
        expected_bucket_owner: AccountId = None,
2155
        **kwargs,
2156
    ) -> RestoreObjectOutput:
2157
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2158

2159
        s3_object = s3_bucket.get_object(
1✔
2160
            key=key,
2161
            version_id=version_id,
2162
            http_method="GET",  # TODO: verify http method
2163
        )
2164
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
2165
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2166

2167
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2168
        # will only implement only the same functionality for now
2169

2170
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2171
        status_code = 200 if s3_object.restore else 202
1✔
2172
        restore_days = restore_request.get("Days")
1✔
2173
        if not restore_days:
1✔
2174
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
2175
            return RestoreObjectOutput()
×
2176

2177
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2178
            datetime.datetime.now(datetime.UTC), restore_days
2179
        )
2180
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2181
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2182

2183
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context_native(
1✔
2184
            context,
2185
            s3_bucket=s3_bucket,
2186
            s3_object=s3_object,
2187
        )
2188
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2189
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2190
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2191
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2192
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2193
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2194
            "Post", "Completed"
2195
        )
2196
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2197

2198
        # TODO: request charged
2199
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2200

2201
    @handler("CreateMultipartUpload", expand=False)
1✔
2202
    def create_multipart_upload(
1✔
2203
        self,
2204
        context: RequestContext,
2205
        request: CreateMultipartUploadRequest,
2206
    ) -> CreateMultipartUploadOutput:
2207
        # TODO: handle missing parameters:
2208
        #  request_payer: RequestPayer = None,
2209
        bucket_name = request["Bucket"]
1✔
2210
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2211

2212
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2213
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2214
        ):
2215
            raise InvalidStorageClass(
1✔
2216
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2217
            )
2218

2219
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2220
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2221

2222
        if tagging := request.get("Tagging"):
1✔
2223
            tagging = parse_tagging_header(tagging_header=tagging)
×
2224

2225
        key = request["Key"]
1✔
2226

2227
        system_metadata = get_system_metadata_from_request(request)
1✔
2228
        if not system_metadata.get("ContentType"):
1✔
2229
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2230

2231
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2232
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2233
            raise InvalidRequest(
1✔
2234
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, SHA1, SHA256]"
2235
            )
2236

2237
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2238
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2239
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2240
            else:
2241
                checksum_type = ChecksumType.COMPOSITE
1✔
2242
        elif checksum_type and not checksum_algorithm:
1✔
2243
            raise InvalidRequest(
1✔
2244
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2245
            )
2246

2247
        if (
1✔
2248
            checksum_type == ChecksumType.COMPOSITE
2249
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2250
        ):
2251
            raise InvalidRequest(
1✔
2252
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2253
            )
2254
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2255
            "SHA"
2256
        ):
2257
            raise InvalidRequest(
1✔
2258
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2259
            )
2260

2261
        # TODO: we're not encrypting the object with the provided key for now
2262
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2263
        validate_sse_c(
1✔
2264
            algorithm=request.get("SSECustomerAlgorithm"),
2265
            encryption_key=request.get("SSECustomerKey"),
2266
            encryption_key_md5=sse_c_key_md5,
2267
            server_side_encryption=request.get("ServerSideEncryption"),
2268
        )
2269

2270
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2271
            request,
2272
            s3_bucket,
2273
            store,
2274
        )
2275
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2276

2277
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2278

2279
        # validate encryption values
2280
        s3_multipart = S3Multipart(
1✔
2281
            key=key,
2282
            storage_class=storage_class,
2283
            expires=request.get("Expires"),
2284
            user_metadata=request.get("Metadata"),
2285
            system_metadata=system_metadata,
2286
            checksum_algorithm=checksum_algorithm,
2287
            checksum_type=checksum_type,
2288
            encryption=encryption_parameters.encryption,
2289
            kms_key_id=encryption_parameters.kms_key_id,
2290
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2291
            sse_key_hash=sse_c_key_md5,
2292
            lock_mode=lock_parameters.lock_mode,
2293
            lock_legal_status=lock_parameters.lock_legal_status,
2294
            lock_until=lock_parameters.lock_until,
2295
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2296
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2297
            acl=acl,
2298
            initiator=get_owner_for_account_id(context.account_id),
2299
            tagging=tagging,
2300
            owner=s3_bucket.owner,
2301
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2302
        )
2303
        # it seems if there is SSE-C on the multipart, AWS S3 will override the default Checksum behavior (but not on
2304
        # PutObject)
2305
        if sse_c_key_md5:
1✔
2306
            s3_multipart.object.checksum_algorithm = None
1✔
2307

2308
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2309

2310
        response = CreateMultipartUploadOutput(
1✔
2311
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2312
        )
2313

2314
        if checksum_algorithm:
1✔
2315
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2316
            response["ChecksumType"] = checksum_type
1✔
2317

2318
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2319
        if sse_c_key_md5:
1✔
2320
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2321
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2322

2323
        # TODO: missing response fields we're not currently supporting
2324
        # - AbortDate: lifecycle related,not currently supported, todo
2325
        # - AbortRuleId: lifecycle related, not currently supported, todo
2326
        # - RequestCharged: todo
2327

2328
        return response
1✔
2329

2330
    @handler("UploadPart", expand=False)
1✔
2331
    def upload_part(
1✔
2332
        self,
2333
        context: RequestContext,
2334
        request: UploadPartRequest,
2335
    ) -> UploadPartOutput:
2336
        # TODO: missing following parameters:
2337
        #  content_length: ContentLength = None, ->validate?
2338
        #  content_md5: ContentMD5 = None, -> validate?
2339
        #  request_payer: RequestPayer = None,
2340
        bucket_name = request["Bucket"]
1✔
2341
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2342

2343
        upload_id = request.get("UploadId")
1✔
2344
        if not (
1✔
2345
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2346
        ) or s3_multipart.object.key != request.get("Key"):
2347
            raise NoSuchUpload(
1✔
2348
                "The specified upload does not exist. "
2349
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2350
                UploadId=upload_id,
2351
            )
2352
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2353
            raise InvalidArgument(
1✔
2354
                "Part number must be an integer between 1 and 10000, inclusive",
2355
                ArgumentName="partNumber",
2356
                ArgumentValue=part_number,
2357
            )
2358

2359
        if content_md5 := request.get("ContentMD5"):
1✔
2360
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2361
            if not base_64_content_md5_to_etag(content_md5):
1✔
2362
                raise InvalidDigest(
1✔
2363
                    "The Content-MD5 you specified was invalid.",
2364
                    Content_MD5=content_md5,
2365
                )
2366

2367
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2368
        checksum_value = (
1✔
2369
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2370
        )
2371

2372
        # TODO: we're not encrypting the object with the provided key for now
2373
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2374
        validate_sse_c(
1✔
2375
            algorithm=request.get("SSECustomerAlgorithm"),
2376
            encryption_key=request.get("SSECustomerKey"),
2377
            encryption_key_md5=sse_c_key_md5,
2378
        )
2379

2380
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2381
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2382
        ):
2383
            raise InvalidRequest(
1✔
2384
                "The multipart upload initiate requested encryption. "
2385
                "Subsequent part requests must include the appropriate encryption parameters."
2386
            )
2387
        elif (
1✔
2388
            s3_multipart.object.sse_key_hash
2389
            and sse_c_key_md5
2390
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2391
        ):
2392
            raise InvalidRequest(
1✔
2393
                "The provided encryption parameters did not match the ones used originally."
2394
            )
2395

2396
        s3_part = S3Part(
1✔
2397
            part_number=part_number,
2398
            checksum_algorithm=checksum_algorithm,
2399
            checksum_value=checksum_value,
2400
        )
2401
        body = request.get("Body")
1✔
2402
        headers = context.request.headers
1✔
2403
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2404
            "STREAMING-"
2405
        ) or "aws-chunked" in headers.get("content-encoding", "")
2406
        # check if chunked request
2407
        if is_aws_chunked:
1✔
2408
            checksum_algorithm = (
1✔
2409
                checksum_algorithm
2410
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2411
            )
2412
            if checksum_algorithm:
1✔
2413
                s3_part.checksum_algorithm = checksum_algorithm
×
2414

2415
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2416
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2417

2418
        if (
1✔
2419
            s3_multipart.checksum_algorithm
2420
            and s3_part.checksum_algorithm != s3_multipart.checksum_algorithm
2421
        ):
2422
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2423
            error_mp_checksum = (
1✔
2424
                s3_multipart.object.checksum_algorithm.lower()
2425
                if s3_multipart.object.checksum_algorithm
2426
                else "null"
2427
            )
2428
            if not error_mp_checksum == "null":
1✔
2429
                raise InvalidRequest(
1✔
2430
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2431
                )
2432

2433
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2434
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2435
            try:
1✔
2436
                stored_s3_part.write(body)
1✔
2437
            except Exception:
1✔
2438
                stored_multipart.remove_part(s3_part)
1✔
2439
                raise
1✔
2440

2441
            if checksum_algorithm:
1✔
2442
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2443
                    stored_multipart.remove_part(s3_part)
1✔
2444
                    raise InvalidRequest(
1✔
2445
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2446
                    )
2447
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2448
                    stored_multipart.remove_part(s3_part)
1✔
2449
                    raise BadDigest(
1✔
2450
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2451
                    )
2452

2453
            if content_md5:
1✔
2454
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2455
                if calculated_md5 != content_md5:
1✔
2456
                    stored_multipart.remove_part(s3_part)
1✔
2457
                    raise BadDigest(
1✔
2458
                        "The Content-MD5 you specified did not match what we received.",
2459
                        ExpectedDigest=content_md5,
2460
                        CalculatedDigest=calculated_md5,
2461
                    )
2462

2463
            s3_multipart.parts[part_number] = s3_part
1✔
2464

2465
        response = UploadPartOutput(
1✔
2466
            ETag=s3_part.quoted_etag,
2467
        )
2468

2469
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2470
        if sse_c_key_md5:
1✔
2471
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2472
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2473

2474
        if s3_part.checksum_algorithm:
1✔
2475
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2476

2477
        # TODO: RequestCharged: Optional[RequestCharged]
2478
        return response
1✔
2479

2480
    @handler("UploadPartCopy", expand=False)
1✔
2481
    def upload_part_copy(
1✔
2482
        self,
2483
        context: RequestContext,
2484
        request: UploadPartCopyRequest,
2485
    ) -> UploadPartCopyOutput:
2486
        # TODO: handle following parameters:
2487
        #  SSECustomerAlgorithm: Optional[SSECustomerAlgorithm]
2488
        #  SSECustomerKey: Optional[SSECustomerKey]
2489
        #  SSECustomerKeyMD5: Optional[SSECustomerKeyMD5]
2490
        #  CopySourceSSECustomerAlgorithm: Optional[CopySourceSSECustomerAlgorithm]
2491
        #  CopySourceSSECustomerKey: Optional[CopySourceSSECustomerKey]
2492
        #  CopySourceSSECustomerKeyMD5: Optional[CopySourceSSECustomerKeyMD5]
2493
        #  RequestPayer: Optional[RequestPayer]
2494
        #  ExpectedBucketOwner: Optional[AccountId]
2495
        #  ExpectedSourceBucketOwner: Optional[AccountId]
2496
        dest_bucket = request["Bucket"]
1✔
2497
        dest_key = request["Key"]
1✔
2498
        store = self.get_store(context.account_id, context.region)
1✔
2499
        # TODO: validate cross-account UploadPartCopy
2500
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
2501
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2502

2503
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2504
            request.get("CopySource")
2505
        )
2506

2507
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
2508
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2509

2510
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2511
        try:
1✔
2512
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
2513
        except MethodNotAllowed:
×
2514
            raise InvalidRequest(
×
2515
                "The source of a copy request may not specifically refer to a delete marker by version id."
2516
            )
2517

2518
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
2519
            raise InvalidObjectState(
×
2520
                "Operation is not valid for the source object's storage class",
2521
                StorageClass=src_s3_object.storage_class,
2522
            )
2523

2524
        upload_id = request.get("UploadId")
1✔
2525
        if (
1✔
2526
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2527
            or s3_multipart.object.key != dest_key
2528
        ):
2529
            raise NoSuchUpload(
×
2530
                "The specified upload does not exist. "
2531
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2532
                UploadId=upload_id,
2533
            )
2534

2535
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2536
            raise InvalidArgument(
×
2537
                "Part number must be an integer between 1 and 10000, inclusive",
2538
                ArgumentName="partNumber",
2539
                ArgumentValue=part_number,
2540
            )
2541

2542
        source_range = request.get("CopySourceRange")
1✔
2543
        # TODO implement copy source IF
2544

2545
        range_data: ObjectRange | None = None
1✔
2546
        if source_range:
1✔
2547
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2548

2549
        if precondition := get_failed_upload_part_copy_source_preconditions(
1✔
2550
            request, src_s3_object.last_modified, src_s3_object.etag
2551
        ):
2552
            raise PreconditionFailed(
1✔
2553
                "At least one of the pre-conditions you specified did not hold",
2554
                Condition=precondition,
2555
            )
2556

2557
        s3_part = S3Part(part_number=part_number)
1✔
2558
        if s3_multipart.checksum_algorithm:
1✔
2559
            s3_part.checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2560

2561
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2562
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2563

2564
        s3_multipart.parts[part_number] = s3_part
1✔
2565

2566
        # TODO: return those fields
2567
        #     RequestCharged: Optional[RequestCharged]
2568

2569
        result = CopyPartResult(
1✔
2570
            ETag=s3_part.quoted_etag,
2571
            LastModified=s3_part.last_modified,
2572
        )
2573

2574
        response = UploadPartCopyOutput(
1✔
2575
            CopyPartResult=result,
2576
        )
2577

2578
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
2579
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2580

2581
        if s3_part.checksum_algorithm:
1✔
2582
            result[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2583

2584
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2585

2586
        return response
1✔
2587

2588
    def complete_multipart_upload(
1✔
2589
        self,
2590
        context: RequestContext,
2591
        bucket: BucketName,
2592
        key: ObjectKey,
2593
        upload_id: MultipartUploadId,
2594
        multipart_upload: CompletedMultipartUpload = None,
2595
        checksum_crc32: ChecksumCRC32 = None,
2596
        checksum_crc32_c: ChecksumCRC32C = None,
2597
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2598
        checksum_sha1: ChecksumSHA1 = None,
2599
        checksum_sha256: ChecksumSHA256 = None,
2600
        checksum_type: ChecksumType = None,
2601
        mpu_object_size: MpuObjectSize = None,
2602
        request_payer: RequestPayer = None,
2603
        expected_bucket_owner: AccountId = None,
2604
        if_match: IfMatch = None,
2605
        if_none_match: IfNoneMatch = None,
2606
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2607
        sse_customer_key: SSECustomerKey = None,
2608
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2609
        **kwargs,
2610
    ) -> CompleteMultipartUploadOutput:
2611
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2612

2613
        if (
1✔
2614
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2615
            or s3_multipart.object.key != key
2616
        ):
2617
            raise NoSuchUpload(
1✔
2618
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2619
                UploadId=upload_id,
2620
            )
2621

2622
        if if_none_match and if_match:
1✔
2623
            raise NotImplementedException(
2624
                "A header you provided implies functionality that is not implemented",
2625
                Header="If-Match,If-None-Match",
2626
                additionalMessage="Multiple conditional request headers present in the request",
2627
            )
2628

2629
        elif if_none_match:
1✔
2630
            if if_none_match != "*":
1✔
2631
                raise NotImplementedException(
2632
                    "A header you provided implies functionality that is not implemented",
2633
                    Header="If-None-Match",
2634
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2635
                )
2636
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2637
                raise PreconditionFailed(
1✔
2638
                    "At least one of the pre-conditions you specified did not hold",
2639
                    Condition="If-None-Match",
2640
                )
2641
            elif s3_multipart.precondition:
1✔
2642
                raise ConditionalRequestConflict(
1✔
2643
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2644
                    Condition="If-None-Match",
2645
                    Key=key,
2646
                )
2647

2648
        elif if_match:
1✔
2649
            if if_match == "*":
1✔
2650
                raise NotImplementedException(
2651
                    "A header you provided implies functionality that is not implemented",
2652
                    Header="If-None-Match",
2653
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2654
                )
2655
            verify_object_equality_precondition_write(
1✔
2656
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2657
            )
2658

2659
        parts = multipart_upload.get("Parts", [])
1✔
2660
        if not parts:
1✔
2661
            raise InvalidRequest("You must specify at least one part")
1✔
2662

2663
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2664
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2665
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2666
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2667
        if sorted(parts_numbers) != parts_numbers:
1✔
2668
            raise InvalidPartOrder(
1✔
2669
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2670
                UploadId=upload_id,
2671
            )
2672

2673
        mpu_checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2674
        mpu_checksum_type = getattr(s3_multipart, "checksum_type", None)
1✔
2675

2676
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2677
            raise InvalidRequest(
1✔
2678
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2679
                f"The complete request must use the same checksum mode."
2680
            )
2681

2682
        # generate the versionId before completing, in case the bucket versioning status has changed between
2683
        # creation and completion? AWS validate this
2684
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2685
        s3_multipart.object.version_id = version_id
1✔
2686

2687
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2688
        # persistence. if we do not have a new version, do not validate those parameters
2689
        # TODO: remove for next major version (minor?)
2690
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2691
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2692
            checksum_map = {
1✔
2693
                "crc32": checksum_crc32,
2694
                "crc32c": checksum_crc32_c,
2695
                "crc64nvme": checksum_crc64_nvme,
2696
                "sha1": checksum_sha1,
2697
                "sha256": checksum_sha256,
2698
            }
2699
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2700
            s3_multipart.complete_multipart(
1✔
2701
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2702
            )
2703
            if mpu_checksum_algorithm and (
1✔
2704
                (
2705
                    checksum_value
2706
                    and mpu_checksum_type == ChecksumType.FULL_OBJECT
2707
                    and not checksum_type
2708
                )
2709
                or any(
2710
                    checksum_value
2711
                    for checksum_type, checksum_value in checksum_map.items()
2712
                    if checksum_type != checksum_algorithm
2713
                )
2714
            ):
2715
                # this is not ideal, but this validation comes last... after the validation of individual parts
2716
                s3_multipart.object.parts.clear()
1✔
2717
                raise BadDigest(
1✔
2718
                    f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2719
                )
2720
        else:
2721
            s3_multipart.complete_multipart(parts)
×
2722

2723
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2724
        stored_multipart.complete_multipart(
1✔
2725
            [s3_multipart.parts.get(part_number) for part_number in parts_numbers]
2726
        )
2727
        if not s3_multipart.checksum_algorithm and s3_multipart.object.checksum_algorithm:
1✔
2728
            with self._storage_backend.open(
1✔
2729
                bucket, s3_multipart.object, mode="r"
2730
            ) as s3_stored_object:
2731
                s3_multipart.object.checksum_value = s3_stored_object.checksum
1✔
2732
                s3_multipart.object.checksum_type = ChecksumType.FULL_OBJECT
1✔
2733

2734
        s3_object = s3_multipart.object
1✔
2735

2736
        s3_bucket.objects.set(key, s3_object)
1✔
2737

2738
        # remove the multipart now that it's complete
2739
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2740
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2741

2742
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2743
        store.TAGS.tags.pop(key_id, None)
1✔
2744
        if s3_multipart.tagging:
1✔
2745
            store.TAGS.tags[key_id] = s3_multipart.tagging
×
2746

2747
        # RequestCharged: Optional[RequestCharged] TODO
2748

2749
        response = CompleteMultipartUploadOutput(
1✔
2750
            Bucket=bucket,
2751
            Key=key,
2752
            ETag=s3_object.quoted_etag,
2753
            Location=f"{get_full_default_bucket_location(bucket)}{key}",
2754
        )
2755

2756
        if s3_object.version_id:
1✔
2757
            response["VersionId"] = s3_object.version_id
×
2758

2759
        # it seems AWS is not returning checksum related fields if the object has KMS encryption ¯\_(ツ)_/¯
2760
        # but it still generates them, and they can be retrieved with regular GetObject and such operations
2761
        if s3_object.checksum_algorithm and not s3_object.kms_key_id:
1✔
2762
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2763
            response["ChecksumType"] = s3_object.checksum_type
1✔
2764

2765
        if s3_object.expiration:
1✔
2766
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2767

2768
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2769

2770
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2771

2772
        return response
1✔
2773

2774
    def abort_multipart_upload(
1✔
2775
        self,
2776
        context: RequestContext,
2777
        bucket: BucketName,
2778
        key: ObjectKey,
2779
        upload_id: MultipartUploadId,
2780
        request_payer: RequestPayer = None,
2781
        expected_bucket_owner: AccountId = None,
2782
        if_match_initiated_time: IfMatchInitiatedTime = None,
2783
        **kwargs,
2784
    ) -> AbortMultipartUploadOutput:
2785
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2786

2787
        if (
1✔
2788
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2789
            or s3_multipart.object.key != key
2790
        ):
2791
            raise NoSuchUpload(
1✔
2792
                "The specified upload does not exist. "
2793
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2794
                UploadId=upload_id,
2795
            )
2796
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2797

2798
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2799
        response = AbortMultipartUploadOutput()
1✔
2800
        # TODO: requestCharged
2801
        return response
1✔
2802

2803
    def list_parts(
1✔
2804
        self,
2805
        context: RequestContext,
2806
        bucket: BucketName,
2807
        key: ObjectKey,
2808
        upload_id: MultipartUploadId,
2809
        max_parts: MaxParts = None,
2810
        part_number_marker: PartNumberMarker = None,
2811
        request_payer: RequestPayer = None,
2812
        expected_bucket_owner: AccountId = None,
2813
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2814
        sse_customer_key: SSECustomerKey = None,
2815
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2816
        **kwargs,
2817
    ) -> ListPartsOutput:
2818
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2819

2820
        if (
1✔
2821
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2822
            or s3_multipart.object.key != key
2823
        ):
2824
            raise NoSuchUpload(
1✔
2825
                "The specified upload does not exist. "
2826
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2827
                UploadId=upload_id,
2828
            )
2829

2830
        count = 0
1✔
2831
        is_truncated = False
1✔
2832
        part_number_marker = part_number_marker or 0
1✔
2833
        max_parts = max_parts or 1000
1✔
2834

2835
        parts = []
1✔
2836
        all_parts = sorted(s3_multipart.parts.items())
1✔
2837
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2838
        for part_number, part in all_parts:
1✔
2839
            if part_number <= part_number_marker:
1✔
2840
                continue
1✔
2841
            part_item = Part(
1✔
2842
                ETag=part.quoted_etag,
2843
                LastModified=part.last_modified,
2844
                PartNumber=part_number,
2845
                Size=part.size,
2846
            )
2847
            if s3_multipart.checksum_algorithm and part.checksum_algorithm:
1✔
2848
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2849

2850
            parts.append(part_item)
1✔
2851
            count += 1
1✔
2852

2853
            if count >= max_parts and part.part_number != last_part_number:
1✔
2854
                is_truncated = True
1✔
2855
                break
1✔
2856

2857
        response = ListPartsOutput(
1✔
2858
            Bucket=bucket,
2859
            Key=key,
2860
            UploadId=upload_id,
2861
            Initiator=s3_multipart.initiator,
2862
            Owner=s3_multipart.initiator,
2863
            StorageClass=s3_multipart.object.storage_class,
2864
            IsTruncated=is_truncated,
2865
            MaxParts=max_parts,
2866
            PartNumberMarker=0,
2867
            NextPartNumberMarker=0,
2868
        )
2869
        if parts:
1✔
2870
            response["Parts"] = parts
1✔
2871
            last_part = parts[-1]["PartNumber"]
1✔
2872
            response["NextPartNumberMarker"] = last_part
1✔
2873

2874
        if part_number_marker:
1✔
2875
            response["PartNumberMarker"] = part_number_marker
1✔
2876
        if s3_multipart.checksum_algorithm:
1✔
2877
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
2878
            response["ChecksumType"] = getattr(s3_multipart, "checksum_type", None)
1✔
2879

2880
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
2881
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
2882
        #     RequestCharged: Optional[RequestCharged]
2883

2884
        return response
1✔
2885

2886
    def list_multipart_uploads(
1✔
2887
        self,
2888
        context: RequestContext,
2889
        bucket: BucketName,
2890
        delimiter: Delimiter = None,
2891
        encoding_type: EncodingType = None,
2892
        key_marker: KeyMarker = None,
2893
        max_uploads: MaxUploads = None,
2894
        prefix: Prefix = None,
2895
        upload_id_marker: UploadIdMarker = None,
2896
        expected_bucket_owner: AccountId = None,
2897
        request_payer: RequestPayer = None,
2898
        **kwargs,
2899
    ) -> ListMultipartUploadsOutput:
2900
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2901

2902
        common_prefixes = set()
1✔
2903
        count = 0
1✔
2904
        is_truncated = False
1✔
2905
        max_uploads = max_uploads or 1000
1✔
2906
        prefix = prefix or ""
1✔
2907
        delimiter = delimiter or ""
1✔
2908
        if encoding_type:
1✔
2909
            prefix = urlparse.quote(prefix)
1✔
2910
            delimiter = urlparse.quote(delimiter)
1✔
2911
        upload_id_marker_found = False
1✔
2912

2913
        if key_marker and upload_id_marker:
1✔
2914
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
2915
            if multipart:
1✔
2916
                key = (
1✔
2917
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
2918
                )
2919
            else:
2920
                # set key to None so it fails if the multipart is not Found
2921
                key = None
×
2922

2923
            if key_marker != key:
1✔
2924
                raise InvalidArgument(
1✔
2925
                    "Invalid uploadId marker",
2926
                    ArgumentName="upload-id-marker",
2927
                    ArgumentValue=upload_id_marker,
2928
                )
2929

2930
        uploads = []
1✔
2931
        # sort by key and initiated
2932
        all_multiparts = sorted(
1✔
2933
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
2934
        )
2935
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
2936

2937
        for multipart in all_multiparts:
1✔
2938
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
2939
            # skip all keys that are different than key_marker
2940
            if key_marker:
1✔
2941
                if key < key_marker:
1✔
2942
                    continue
1✔
2943
                elif key == key_marker:
1✔
2944
                    if not upload_id_marker:
1✔
2945
                        continue
1✔
2946
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
2947
                    if multipart.id == upload_id_marker:
1✔
2948
                        upload_id_marker_found = True
1✔
2949
                        continue
1✔
2950
                    elif not upload_id_marker_found:
1✔
2951
                        # as long as we have not passed the version_key_marker, skip the versions
2952
                        continue
1✔
2953

2954
            # Filter for keys that start with prefix
2955
            if prefix and not key.startswith(prefix):
1✔
2956
                continue
1✔
2957

2958
            # see ListObjectsV2 for the logic comments (shared logic here)
2959
            prefix_including_delimiter = None
1✔
2960
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2961
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2962
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2963

2964
                if prefix_including_delimiter in common_prefixes or (
1✔
2965
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2966
                ):
2967
                    continue
1✔
2968

2969
            if prefix_including_delimiter:
1✔
2970
                common_prefixes.add(prefix_including_delimiter)
1✔
2971
            else:
2972
                multipart_upload = MultipartUpload(
1✔
2973
                    UploadId=multipart.id,
2974
                    Key=multipart.object.key,
2975
                    Initiated=multipart.initiated,
2976
                    StorageClass=multipart.object.storage_class,
2977
                    Owner=multipart.initiator,  # TODO: check the difference
2978
                    Initiator=multipart.initiator,
2979
                )
2980
                if multipart.checksum_algorithm:
1✔
2981
                    multipart_upload["ChecksumAlgorithm"] = multipart.checksum_algorithm
1✔
2982
                    multipart_upload["ChecksumType"] = getattr(multipart, "checksum_type", None)
1✔
2983

2984
                uploads.append(multipart_upload)
1✔
2985

2986
            count += 1
1✔
2987
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
2988
                is_truncated = True
1✔
2989
                break
1✔
2990

2991
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2992

2993
        response = ListMultipartUploadsOutput(
1✔
2994
            Bucket=bucket,
2995
            IsTruncated=is_truncated,
2996
            MaxUploads=max_uploads or 1000,
2997
            KeyMarker=key_marker or "",
2998
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
2999
            NextKeyMarker="",
3000
            NextUploadIdMarker="",
3001
        )
3002
        if uploads:
1✔
3003
            response["Uploads"] = uploads
1✔
3004
            last_upload = uploads[-1]
1✔
3005
            response["NextKeyMarker"] = last_upload["Key"]
1✔
3006
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
3007
        if delimiter:
1✔
3008
            response["Delimiter"] = delimiter
1✔
3009
        if prefix:
1✔
3010
            response["Prefix"] = prefix
1✔
3011
        if encoding_type:
1✔
3012
            response["EncodingType"] = EncodingType.url
1✔
3013
        if common_prefixes:
1✔
3014
            response["CommonPrefixes"] = common_prefixes
1✔
3015

3016
        return response
1✔
3017

3018
    def put_bucket_versioning(
1✔
3019
        self,
3020
        context: RequestContext,
3021
        bucket: BucketName,
3022
        versioning_configuration: VersioningConfiguration,
3023
        content_md5: ContentMD5 = None,
3024
        checksum_algorithm: ChecksumAlgorithm = None,
3025
        mfa: MFA = None,
3026
        expected_bucket_owner: AccountId = None,
3027
        **kwargs,
3028
    ) -> None:
3029
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3030
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
3031
            raise CommonServiceException(
1✔
3032
                code="IllegalVersioningConfigurationException",
3033
                message="The Versioning element must be specified",
3034
            )
3035

3036
        if versioning_status not in ("Enabled", "Suspended"):
1✔
3037
            raise MalformedXML()
1✔
3038

3039
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
3040
            raise InvalidBucketState(
1✔
3041
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
3042
            )
3043

3044
        if not s3_bucket.versioning_status:
1✔
3045
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
3046

3047
        s3_bucket.versioning_status = versioning_status
1✔
3048

3049
    def get_bucket_versioning(
1✔
3050
        self,
3051
        context: RequestContext,
3052
        bucket: BucketName,
3053
        expected_bucket_owner: AccountId = None,
3054
        **kwargs,
3055
    ) -> GetBucketVersioningOutput:
3056
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3057

3058
        if not s3_bucket.versioning_status:
1✔
3059
            return GetBucketVersioningOutput()
1✔
3060

3061
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
3062

3063
    def get_bucket_encryption(
1✔
3064
        self,
3065
        context: RequestContext,
3066
        bucket: BucketName,
3067
        expected_bucket_owner: AccountId = None,
3068
        **kwargs,
3069
    ) -> GetBucketEncryptionOutput:
3070
        # AWS now encrypts bucket by default with AES256, see:
3071
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
3072
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3073

3074
        if not s3_bucket.encryption_rule:
1✔
3075
            return GetBucketEncryptionOutput()
×
3076

3077
        return GetBucketEncryptionOutput(
1✔
3078
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
3079
        )
3080

3081
    def put_bucket_encryption(
1✔
3082
        self,
3083
        context: RequestContext,
3084
        bucket: BucketName,
3085
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
3086
        content_md5: ContentMD5 = None,
3087
        checksum_algorithm: ChecksumAlgorithm = None,
3088
        expected_bucket_owner: AccountId = None,
3089
        **kwargs,
3090
    ) -> None:
3091
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3092

3093
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
3094
            raise MalformedXML()
1✔
3095

3096
        if len(rules) != 1 or not (
1✔
3097
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
3098
        ):
3099
            raise MalformedXML()
1✔
3100

3101
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
3102
            raise MalformedXML()
×
3103

3104
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
3105
            raise MalformedXML()
×
3106

3107
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
3108
            raise InvalidArgument(
1✔
3109
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
3110
                ArgumentName="ApplyServerSideEncryptionByDefault",
3111
            )
3112
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
3113
        # TODO: validate KMS key? not currently done in moto
3114
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
3115
        # It's always saved as the ARN in the bucket configuration.
3116
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
3117
        # encryption["KMSMasterKeyID"] = master_kms_key
3118

3119
        s3_bucket.encryption_rule = rules[0]
1✔
3120

3121
    def delete_bucket_encryption(
1✔
3122
        self,
3123
        context: RequestContext,
3124
        bucket: BucketName,
3125
        expected_bucket_owner: AccountId = None,
3126
        **kwargs,
3127
    ) -> None:
3128
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3129

3130
        s3_bucket.encryption_rule = None
1✔
3131

3132
    def put_bucket_notification_configuration(
1✔
3133
        self,
3134
        context: RequestContext,
3135
        bucket: BucketName,
3136
        notification_configuration: NotificationConfiguration,
3137
        expected_bucket_owner: AccountId = None,
3138
        skip_destination_validation: SkipValidation = None,
3139
        **kwargs,
3140
    ) -> None:
3141
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3142

3143
        self._verify_notification_configuration(
1✔
3144
            notification_configuration, skip_destination_validation, context, bucket
3145
        )
3146
        s3_bucket.notification_configuration = notification_configuration
1✔
3147

3148
    def get_bucket_notification_configuration(
1✔
3149
        self,
3150
        context: RequestContext,
3151
        bucket: BucketName,
3152
        expected_bucket_owner: AccountId = None,
3153
        **kwargs,
3154
    ) -> NotificationConfiguration:
3155
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3156

3157
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3158

3159
    def put_bucket_tagging(
1✔
3160
        self,
3161
        context: RequestContext,
3162
        bucket: BucketName,
3163
        tagging: Tagging,
3164
        content_md5: ContentMD5 = None,
3165
        checksum_algorithm: ChecksumAlgorithm = None,
3166
        expected_bucket_owner: AccountId = None,
3167
        **kwargs,
3168
    ) -> None:
3169
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3170

3171
        if "TagSet" not in tagging:
1✔
3172
            raise MalformedXML()
×
3173

3174
        tag_set = tagging["TagSet"] or []
1✔
3175
        validate_tag_set(tag_set, type_set="bucket")
1✔
3176

3177
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3178
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3179
        store.TAGS.tag_resource(s3_bucket.bucket_arn, tags=tag_set)
1✔
3180

3181
    def get_bucket_tagging(
1✔
3182
        self,
3183
        context: RequestContext,
3184
        bucket: BucketName,
3185
        expected_bucket_owner: AccountId = None,
3186
        **kwargs,
3187
    ) -> GetBucketTaggingOutput:
3188
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3189
        tag_set = store.TAGS.list_tags_for_resource(s3_bucket.bucket_arn, root_name="Tags")["Tags"]
1✔
3190
        if not tag_set:
1✔
3191
            raise NoSuchTagSet(
1✔
3192
                "The TagSet does not exist",
3193
                BucketName=bucket,
3194
            )
3195

3196
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3197

3198
    def delete_bucket_tagging(
1✔
3199
        self,
3200
        context: RequestContext,
3201
        bucket: BucketName,
3202
        expected_bucket_owner: AccountId = None,
3203
        **kwargs,
3204
    ) -> None:
3205
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3206

3207
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3208

3209
    def put_object_tagging(
1✔
3210
        self,
3211
        context: RequestContext,
3212
        bucket: BucketName,
3213
        key: ObjectKey,
3214
        tagging: Tagging,
3215
        version_id: ObjectVersionId = None,
3216
        content_md5: ContentMD5 = None,
3217
        checksum_algorithm: ChecksumAlgorithm = None,
3218
        expected_bucket_owner: AccountId = None,
3219
        request_payer: RequestPayer = None,
3220
        **kwargs,
3221
    ) -> PutObjectTaggingOutput:
3222
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3223

3224
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3225

3226
        if "TagSet" not in tagging:
1✔
3227
            raise MalformedXML()
×
3228

3229
        tag_set = tagging["TagSet"] or []
1✔
3230
        validate_tag_set(tag_set, type_set="object")
1✔
3231

3232
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3233
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3234
        store.TAGS.tags.pop(key_id, None)
1✔
3235
        store.TAGS.tag_resource(key_id, tags=tag_set)
1✔
3236
        response = PutObjectTaggingOutput()
1✔
3237
        if s3_object.version_id:
1✔
3238
            response["VersionId"] = s3_object.version_id
1✔
3239

3240
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3241

3242
        return response
1✔
3243

3244
    def get_object_tagging(
1✔
3245
        self,
3246
        context: RequestContext,
3247
        bucket: BucketName,
3248
        key: ObjectKey,
3249
        version_id: ObjectVersionId = None,
3250
        expected_bucket_owner: AccountId = None,
3251
        request_payer: RequestPayer = None,
3252
        **kwargs,
3253
    ) -> GetObjectTaggingOutput:
3254
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3255

3256
        try:
1✔
3257
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3258
        except NoSuchKey as e:
1✔
3259
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3260
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3261
            # `S3Bucket.get_object` signature for one operation.
3262
            if s3_bucket.versioning_status and (
1✔
3263
                s3_object_version := s3_bucket.objects.get(key, version_id)
3264
            ):
3265
                raise MethodNotAllowed(
1✔
3266
                    "The specified method is not allowed against this resource.",
3267
                    Method="GET",
3268
                    ResourceType="DeleteMarker",
3269
                    DeleteMarker=True,
3270
                    Allow="DELETE",
3271
                    VersionId=s3_object_version.version_id,
3272
                )
3273

3274
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3275
            # follow AWS on this one
3276
            e.Key = f"{bucket}/{key}"
1✔
3277
            raise e
1✔
3278

3279
        tag_set = store.TAGS.list_tags_for_resource(
1✔
3280
            get_unique_key_id(bucket, key, s3_object.version_id)
3281
        )["Tags"]
3282
        response = GetObjectTaggingOutput(TagSet=tag_set)
1✔
3283
        if s3_object.version_id:
1✔
3284
            response["VersionId"] = s3_object.version_id
1✔
3285

3286
        return response
1✔
3287

3288
    def delete_object_tagging(
1✔
3289
        self,
3290
        context: RequestContext,
3291
        bucket: BucketName,
3292
        key: ObjectKey,
3293
        version_id: ObjectVersionId = None,
3294
        expected_bucket_owner: AccountId = None,
3295
        **kwargs,
3296
    ) -> DeleteObjectTaggingOutput:
3297
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3298

3299
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3300

3301
        store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
3302
        response = DeleteObjectTaggingOutput()
1✔
3303
        if s3_object.version_id:
1✔
3304
            response["VersionId"] = s3_object.version_id
×
3305

3306
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3307

3308
        return response
1✔
3309

3310
    def put_bucket_cors(
1✔
3311
        self,
3312
        context: RequestContext,
3313
        bucket: BucketName,
3314
        cors_configuration: CORSConfiguration,
3315
        content_md5: ContentMD5 = None,
3316
        checksum_algorithm: ChecksumAlgorithm = None,
3317
        expected_bucket_owner: AccountId = None,
3318
        **kwargs,
3319
    ) -> None:
3320
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3321
        validate_cors_configuration(cors_configuration)
1✔
3322
        s3_bucket.cors_rules = cors_configuration
1✔
3323
        self._cors_handler.invalidate_cache()
1✔
3324

3325
    def get_bucket_cors(
1✔
3326
        self,
3327
        context: RequestContext,
3328
        bucket: BucketName,
3329
        expected_bucket_owner: AccountId = None,
3330
        **kwargs,
3331
    ) -> GetBucketCorsOutput:
3332
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3333

3334
        if not s3_bucket.cors_rules:
1✔
3335
            raise NoSuchCORSConfiguration(
1✔
3336
                "The CORS configuration does not exist",
3337
                BucketName=bucket,
3338
            )
3339
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3340

3341
    def delete_bucket_cors(
1✔
3342
        self,
3343
        context: RequestContext,
3344
        bucket: BucketName,
3345
        expected_bucket_owner: AccountId = None,
3346
        **kwargs,
3347
    ) -> None:
3348
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3349

3350
        if s3_bucket.cors_rules:
1✔
3351
            self._cors_handler.invalidate_cache()
1✔
3352
            s3_bucket.cors_rules = None
1✔
3353

3354
    def get_bucket_lifecycle_configuration(
1✔
3355
        self,
3356
        context: RequestContext,
3357
        bucket: BucketName,
3358
        expected_bucket_owner: AccountId = None,
3359
        **kwargs,
3360
    ) -> GetBucketLifecycleConfigurationOutput:
3361
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3362

3363
        if not s3_bucket.lifecycle_rules:
1✔
3364
            raise NoSuchLifecycleConfiguration(
1✔
3365
                "The lifecycle configuration does not exist",
3366
                BucketName=bucket,
3367
            )
3368

3369
        return GetBucketLifecycleConfigurationOutput(
1✔
3370
            Rules=s3_bucket.lifecycle_rules,
3371
            # TODO: remove for next major version, safe access to new attribute
3372
            TransitionDefaultMinimumObjectSize=getattr(
3373
                s3_bucket,
3374
                "transition_default_minimum_object_size",
3375
                TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3376
            ),
3377
        )
3378

3379
    def put_bucket_lifecycle_configuration(
1✔
3380
        self,
3381
        context: RequestContext,
3382
        bucket: BucketName,
3383
        checksum_algorithm: ChecksumAlgorithm = None,
3384
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3385
        expected_bucket_owner: AccountId = None,
3386
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3387
        **kwargs,
3388
    ) -> PutBucketLifecycleConfigurationOutput:
3389
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3390

3391
        transition_min_obj_size = (
1✔
3392
            transition_default_minimum_object_size
3393
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3394
        )
3395

3396
        if transition_min_obj_size not in (
1✔
3397
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3398
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3399
        ):
3400
            raise InvalidRequest(
1✔
3401
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3402
            )
3403

3404
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3405
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3406
        #  everytime we get/head an object
3407
        # for now, we keep a cache and get it everytime we fetch an object
3408
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3409
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3410
        self._expiration_cache[bucket].clear()
1✔
3411
        return PutBucketLifecycleConfigurationOutput(
1✔
3412
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3413
        )
3414

3415
    def delete_bucket_lifecycle(
1✔
3416
        self,
3417
        context: RequestContext,
3418
        bucket: BucketName,
3419
        expected_bucket_owner: AccountId = None,
3420
        **kwargs,
3421
    ) -> None:
3422
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3423

3424
        s3_bucket.lifecycle_rules = None
1✔
3425
        self._expiration_cache[bucket].clear()
1✔
3426

3427
    def put_bucket_analytics_configuration(
1✔
3428
        self,
3429
        context: RequestContext,
3430
        bucket: BucketName,
3431
        id: AnalyticsId,
3432
        analytics_configuration: AnalyticsConfiguration,
3433
        expected_bucket_owner: AccountId = None,
3434
        **kwargs,
3435
    ) -> None:
3436
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3437

3438
        validate_bucket_analytics_configuration(
1✔
3439
            id=id, analytics_configuration=analytics_configuration
3440
        )
3441

3442
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3443

3444
    def get_bucket_analytics_configuration(
1✔
3445
        self,
3446
        context: RequestContext,
3447
        bucket: BucketName,
3448
        id: AnalyticsId,
3449
        expected_bucket_owner: AccountId = None,
3450
        **kwargs,
3451
    ) -> GetBucketAnalyticsConfigurationOutput:
3452
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3453

3454
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3455
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3456

3457
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3458

3459
    def list_bucket_analytics_configurations(
1✔
3460
        self,
3461
        context: RequestContext,
3462
        bucket: BucketName,
3463
        continuation_token: Token = None,
3464
        expected_bucket_owner: AccountId = None,
3465
        **kwargs,
3466
    ) -> ListBucketAnalyticsConfigurationsOutput:
3467
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3468

3469
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3470
            IsTruncated=False,
3471
            AnalyticsConfigurationList=sorted(
3472
                s3_bucket.analytics_configurations.values(),
3473
                key=itemgetter("Id"),
3474
            ),
3475
        )
3476

3477
    def delete_bucket_analytics_configuration(
1✔
3478
        self,
3479
        context: RequestContext,
3480
        bucket: BucketName,
3481
        id: AnalyticsId,
3482
        expected_bucket_owner: AccountId = None,
3483
        **kwargs,
3484
    ) -> None:
3485
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3486

3487
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3488
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3489

3490
    def put_bucket_intelligent_tiering_configuration(
1✔
3491
        self,
3492
        context: RequestContext,
3493
        bucket: BucketName,
3494
        id: IntelligentTieringId,
3495
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3496
        expected_bucket_owner: AccountId | None = None,
3497
        **kwargs,
3498
    ) -> None:
3499
        # TODO add support for expected_bucket_owner
3500
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3501

3502
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3503

3504
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3505

3506
    def get_bucket_intelligent_tiering_configuration(
1✔
3507
        self,
3508
        context: RequestContext,
3509
        bucket: BucketName,
3510
        id: IntelligentTieringId,
3511
        expected_bucket_owner: AccountId | None = None,
3512
        **kwargs,
3513
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3514
        # TODO add support for expected_bucket_owner
3515
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3516

3517
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
3518
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3519

3520
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3521
            IntelligentTieringConfiguration=itier_config
3522
        )
3523

3524
    def delete_bucket_intelligent_tiering_configuration(
1✔
3525
        self,
3526
        context: RequestContext,
3527
        bucket: BucketName,
3528
        id: IntelligentTieringId,
3529
        expected_bucket_owner: AccountId | None = None,
3530
        **kwargs,
3531
    ) -> None:
3532
        # TODO add support for expected_bucket_owner
3533
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3534

3535
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3536
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3537

3538
    def list_bucket_intelligent_tiering_configurations(
1✔
3539
        self,
3540
        context: RequestContext,
3541
        bucket: BucketName,
3542
        continuation_token: Token | None = None,
3543
        expected_bucket_owner: AccountId | None = None,
3544
        **kwargs,
3545
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3546
        # TODO add support for expected_bucket_owner
3547
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3548

3549
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3550
            IsTruncated=False,
3551
            IntelligentTieringConfigurationList=sorted(
3552
                s3_bucket.intelligent_tiering_configurations.values(),
3553
                key=itemgetter("Id"),
3554
            ),
3555
        )
3556

3557
    def put_bucket_inventory_configuration(
1✔
3558
        self,
3559
        context: RequestContext,
3560
        bucket: BucketName,
3561
        id: InventoryId,
3562
        inventory_configuration: InventoryConfiguration,
3563
        expected_bucket_owner: AccountId = None,
3564
        **kwargs,
3565
    ) -> None:
3566
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3567

3568
        validate_inventory_configuration(
1✔
3569
            config_id=id, inventory_configuration=inventory_configuration
3570
        )
3571
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3572

3573
    def get_bucket_inventory_configuration(
1✔
3574
        self,
3575
        context: RequestContext,
3576
        bucket: BucketName,
3577
        id: InventoryId,
3578
        expected_bucket_owner: AccountId = None,
3579
        **kwargs,
3580
    ) -> GetBucketInventoryConfigurationOutput:
3581
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3582

3583
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3584
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3585
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3586

3587
    def list_bucket_inventory_configurations(
1✔
3588
        self,
3589
        context: RequestContext,
3590
        bucket: BucketName,
3591
        continuation_token: Token = None,
3592
        expected_bucket_owner: AccountId = None,
3593
        **kwargs,
3594
    ) -> ListBucketInventoryConfigurationsOutput:
3595
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3596

3597
        return ListBucketInventoryConfigurationsOutput(
1✔
3598
            IsTruncated=False,
3599
            InventoryConfigurationList=sorted(
3600
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3601
            ),
3602
        )
3603

3604
    def delete_bucket_inventory_configuration(
1✔
3605
        self,
3606
        context: RequestContext,
3607
        bucket: BucketName,
3608
        id: InventoryId,
3609
        expected_bucket_owner: AccountId = None,
3610
        **kwargs,
3611
    ) -> None:
3612
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3613

3614
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
3615
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3616

3617
    def get_bucket_website(
1✔
3618
        self,
3619
        context: RequestContext,
3620
        bucket: BucketName,
3621
        expected_bucket_owner: AccountId = None,
3622
        **kwargs,
3623
    ) -> GetBucketWebsiteOutput:
3624
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3625

3626
        if not s3_bucket.website_configuration:
1✔
3627
            raise NoSuchWebsiteConfiguration(
1✔
3628
                "The specified bucket does not have a website configuration",
3629
                BucketName=bucket,
3630
            )
3631
        return s3_bucket.website_configuration
1✔
3632

3633
    def put_bucket_website(
1✔
3634
        self,
3635
        context: RequestContext,
3636
        bucket: BucketName,
3637
        website_configuration: WebsiteConfiguration,
3638
        content_md5: ContentMD5 = None,
3639
        checksum_algorithm: ChecksumAlgorithm = None,
3640
        expected_bucket_owner: AccountId = None,
3641
        **kwargs,
3642
    ) -> None:
3643
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3644

3645
        validate_website_configuration(website_configuration)
1✔
3646
        s3_bucket.website_configuration = website_configuration
1✔
3647

3648
    def delete_bucket_website(
1✔
3649
        self,
3650
        context: RequestContext,
3651
        bucket: BucketName,
3652
        expected_bucket_owner: AccountId = None,
3653
        **kwargs,
3654
    ) -> None:
3655
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3656
        # does not raise error if the bucket did not have a config, will simply return
3657
        s3_bucket.website_configuration = None
1✔
3658

3659
    def get_object_lock_configuration(
1✔
3660
        self,
3661
        context: RequestContext,
3662
        bucket: BucketName,
3663
        expected_bucket_owner: AccountId = None,
3664
        **kwargs,
3665
    ) -> GetObjectLockConfigurationOutput:
3666
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3667
        if not s3_bucket.object_lock_enabled:
1✔
3668
            raise ObjectLockConfigurationNotFoundError(
1✔
3669
                "Object Lock configuration does not exist for this bucket",
3670
                BucketName=bucket,
3671
            )
3672

3673
        response = GetObjectLockConfigurationOutput(
1✔
3674
            ObjectLockConfiguration=ObjectLockConfiguration(
3675
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3676
            )
3677
        )
3678
        if s3_bucket.object_lock_default_retention:
1✔
3679
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3680
                "DefaultRetention": s3_bucket.object_lock_default_retention
3681
            }
3682

3683
        return response
1✔
3684

3685
    def put_object_lock_configuration(
1✔
3686
        self,
3687
        context: RequestContext,
3688
        bucket: BucketName,
3689
        object_lock_configuration: ObjectLockConfiguration = None,
3690
        request_payer: RequestPayer = None,
3691
        token: ObjectLockToken = None,
3692
        content_md5: ContentMD5 = None,
3693
        checksum_algorithm: ChecksumAlgorithm = None,
3694
        expected_bucket_owner: AccountId = None,
3695
        **kwargs,
3696
    ) -> PutObjectLockConfigurationOutput:
3697
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3698
        if s3_bucket.versioning_status != "Enabled":
1✔
3699
            raise InvalidBucketState(
1✔
3700
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3701
            )
3702

3703
        if (
1✔
3704
            not object_lock_configuration
3705
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3706
        ):
3707
            raise MalformedXML()
1✔
3708

3709
        if "Rule" not in object_lock_configuration:
1✔
3710
            s3_bucket.object_lock_default_retention = None
1✔
3711
            if not s3_bucket.object_lock_enabled:
1✔
3712
                s3_bucket.object_lock_enabled = True
1✔
3713

3714
            return PutObjectLockConfigurationOutput()
1✔
3715
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3716
            default_retention := rule.get("DefaultRetention")
3717
        ):
3718
            raise MalformedXML()
1✔
3719

3720
        if "Mode" not in default_retention or (
1✔
3721
            ("Days" in default_retention and "Years" in default_retention)
3722
            or ("Days" not in default_retention and "Years" not in default_retention)
3723
        ):
3724
            raise MalformedXML()
1✔
3725

3726
        if default_retention["Mode"] not in OBJECT_LOCK_MODES:
1✔
3727
            raise MalformedXML()
1✔
3728

3729
        s3_bucket.object_lock_default_retention = default_retention
1✔
3730
        if not s3_bucket.object_lock_enabled:
1✔
3731
            s3_bucket.object_lock_enabled = True
×
3732

3733
        return PutObjectLockConfigurationOutput()
1✔
3734

3735
    def get_object_legal_hold(
1✔
3736
        self,
3737
        context: RequestContext,
3738
        bucket: BucketName,
3739
        key: ObjectKey,
3740
        version_id: ObjectVersionId = None,
3741
        request_payer: RequestPayer = None,
3742
        expected_bucket_owner: AccountId = None,
3743
        **kwargs,
3744
    ) -> GetObjectLegalHoldOutput:
3745
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3746
        if not s3_bucket.object_lock_enabled:
1✔
3747
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3748

3749
        s3_object = s3_bucket.get_object(
1✔
3750
            key=key,
3751
            version_id=version_id,
3752
            http_method="GET",
3753
        )
3754
        if not s3_object.lock_legal_status:
1✔
3755
            raise NoSuchObjectLockConfiguration(
1✔
3756
                "The specified object does not have a ObjectLock configuration"
3757
            )
3758

3759
        return GetObjectLegalHoldOutput(
1✔
3760
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3761
        )
3762

3763
    def put_object_legal_hold(
1✔
3764
        self,
3765
        context: RequestContext,
3766
        bucket: BucketName,
3767
        key: ObjectKey,
3768
        legal_hold: ObjectLockLegalHold = None,
3769
        request_payer: RequestPayer = None,
3770
        version_id: ObjectVersionId = None,
3771
        content_md5: ContentMD5 = None,
3772
        checksum_algorithm: ChecksumAlgorithm = None,
3773
        expected_bucket_owner: AccountId = None,
3774
        **kwargs,
3775
    ) -> PutObjectLegalHoldOutput:
3776
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3777

3778
        if not legal_hold:
1✔
3779
            raise MalformedXML()
1✔
3780

3781
        if not s3_bucket.object_lock_enabled:
1✔
3782
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3783

3784
        s3_object = s3_bucket.get_object(
1✔
3785
            key=key,
3786
            version_id=version_id,
3787
            http_method="PUT",
3788
        )
3789
        # TODO: check casing
3790
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
3791
            raise MalformedXML()
×
3792

3793
        s3_object.lock_legal_status = status
1✔
3794

3795
        # TODO: return RequestCharged
3796
        return PutObjectRetentionOutput()
1✔
3797

3798
    def get_object_retention(
1✔
3799
        self,
3800
        context: RequestContext,
3801
        bucket: BucketName,
3802
        key: ObjectKey,
3803
        version_id: ObjectVersionId = None,
3804
        request_payer: RequestPayer = None,
3805
        expected_bucket_owner: AccountId = None,
3806
        **kwargs,
3807
    ) -> GetObjectRetentionOutput:
3808
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3809
        if not s3_bucket.object_lock_enabled:
1✔
3810
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3811

3812
        s3_object = s3_bucket.get_object(
1✔
3813
            key=key,
3814
            version_id=version_id,
3815
            http_method="GET",
3816
        )
3817
        if not s3_object.lock_mode:
1✔
3818
            raise NoSuchObjectLockConfiguration(
1✔
3819
                "The specified object does not have a ObjectLock configuration"
3820
            )
3821

3822
        return GetObjectRetentionOutput(
1✔
3823
            Retention=ObjectLockRetention(
3824
                Mode=s3_object.lock_mode,
3825
                RetainUntilDate=s3_object.lock_until,
3826
            )
3827
        )
3828

3829
    def put_object_retention(
1✔
3830
        self,
3831
        context: RequestContext,
3832
        bucket: BucketName,
3833
        key: ObjectKey,
3834
        retention: ObjectLockRetention = None,
3835
        request_payer: RequestPayer = None,
3836
        version_id: ObjectVersionId = None,
3837
        bypass_governance_retention: BypassGovernanceRetention = None,
3838
        content_md5: ContentMD5 = None,
3839
        checksum_algorithm: ChecksumAlgorithm = None,
3840
        expected_bucket_owner: AccountId = None,
3841
        **kwargs,
3842
    ) -> PutObjectRetentionOutput:
3843
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3844
        if not s3_bucket.object_lock_enabled:
1✔
3845
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3846

3847
        s3_object = s3_bucket.get_object(
1✔
3848
            key=key,
3849
            version_id=version_id,
3850
            http_method="PUT",
3851
        )
3852

3853
        if retention and (
1✔
3854
            not validate_dict_fields(retention, required_fields={"Mode", "RetainUntilDate"})
3855
            or retention["Mode"] not in OBJECT_LOCK_MODES
3856
        ):
3857
            raise MalformedXML()
1✔
3858

3859
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
3860
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
3861
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
3862
            pst_datetime = retention["RetainUntilDate"].astimezone(
1✔
3863
                tz=ZoneInfo("America/Los_Angeles")
3864
            )
3865
            raise InvalidArgument(
1✔
3866
                "The retain until date must be in the future!",
3867
                ArgumentName="RetainUntilDate",
3868
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
3869
            )
3870

3871
        is_request_reducing_locking = (
1✔
3872
            not retention
3873
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
3874
            or (
3875
                retention["Mode"] == ObjectLockMode.GOVERNANCE
3876
                and s3_object.lock_mode == ObjectLockMode.COMPLIANCE
3877
            )
3878
        )
3879
        if is_request_reducing_locking and (
1✔
3880
            s3_object.lock_mode == ObjectLockMode.COMPLIANCE
3881
            or (
3882
                s3_object.lock_mode == ObjectLockMode.GOVERNANCE and not bypass_governance_retention
3883
            )
3884
        ):
3885
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
3886

3887
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
3888
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
3889

3890
        # TODO: return RequestCharged
3891
        return PutObjectRetentionOutput()
1✔
3892

3893
    def put_bucket_request_payment(
1✔
3894
        self,
3895
        context: RequestContext,
3896
        bucket: BucketName,
3897
        request_payment_configuration: RequestPaymentConfiguration,
3898
        content_md5: ContentMD5 = None,
3899
        checksum_algorithm: ChecksumAlgorithm = None,
3900
        expected_bucket_owner: AccountId = None,
3901
        **kwargs,
3902
    ) -> None:
3903
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3904
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3905

3906
        payer = request_payment_configuration.get("Payer")
1✔
3907
        if payer not in ["Requester", "BucketOwner"]:
1✔
3908
            raise MalformedXML()
1✔
3909

3910
        s3_bucket.payer = payer
1✔
3911

3912
    def get_bucket_request_payment(
1✔
3913
        self,
3914
        context: RequestContext,
3915
        bucket: BucketName,
3916
        expected_bucket_owner: AccountId = None,
3917
        **kwargs,
3918
    ) -> GetBucketRequestPaymentOutput:
3919
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3920
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3921

3922
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
3923

3924
    def get_bucket_ownership_controls(
1✔
3925
        self,
3926
        context: RequestContext,
3927
        bucket: BucketName,
3928
        expected_bucket_owner: AccountId = None,
3929
        **kwargs,
3930
    ) -> GetBucketOwnershipControlsOutput:
3931
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3932

3933
        if not s3_bucket.object_ownership:
1✔
3934
            raise OwnershipControlsNotFoundError(
1✔
3935
                "The bucket ownership controls were not found",
3936
                BucketName=bucket,
3937
            )
3938

3939
        return GetBucketOwnershipControlsOutput(
1✔
3940
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
3941
        )
3942

3943
    def put_bucket_ownership_controls(
1✔
3944
        self,
3945
        context: RequestContext,
3946
        bucket: BucketName,
3947
        ownership_controls: OwnershipControls,
3948
        content_md5: ContentMD5 | None = None,
3949
        expected_bucket_owner: AccountId | None = None,
3950
        checksum_algorithm: ChecksumAlgorithm | None = None,
3951
        **kwargs,
3952
    ) -> None:
3953
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3954
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
3955
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3956

3957
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
3958
            raise MalformedXML()
1✔
3959

3960
        rule = rules[0]
1✔
3961
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
3962
            raise MalformedXML()
1✔
3963

3964
        s3_bucket.object_ownership = object_ownership
1✔
3965

3966
    def delete_bucket_ownership_controls(
1✔
3967
        self,
3968
        context: RequestContext,
3969
        bucket: BucketName,
3970
        expected_bucket_owner: AccountId = None,
3971
        **kwargs,
3972
    ) -> None:
3973
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3974

3975
        s3_bucket.object_ownership = None
1✔
3976

3977
    def get_public_access_block(
1✔
3978
        self,
3979
        context: RequestContext,
3980
        bucket: BucketName,
3981
        expected_bucket_owner: AccountId = None,
3982
        **kwargs,
3983
    ) -> GetPublicAccessBlockOutput:
3984
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3985

3986
        if not s3_bucket.public_access_block:
1✔
3987
            raise NoSuchPublicAccessBlockConfiguration(
1✔
3988
                "The public access block configuration was not found", BucketName=bucket
3989
            )
3990

3991
        return GetPublicAccessBlockOutput(
1✔
3992
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
3993
        )
3994

3995
    def put_public_access_block(
1✔
3996
        self,
3997
        context: RequestContext,
3998
        bucket: BucketName,
3999
        public_access_block_configuration: PublicAccessBlockConfiguration,
4000
        content_md5: ContentMD5 = None,
4001
        checksum_algorithm: ChecksumAlgorithm = None,
4002
        expected_bucket_owner: AccountId = None,
4003
        **kwargs,
4004
    ) -> None:
4005
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4006
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
4007
        #  bucket configuration. See s3control
4008
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4009

4010
        public_access_block_fields = {
1✔
4011
            "BlockPublicAcls",
4012
            "BlockPublicPolicy",
4013
            "IgnorePublicAcls",
4014
            "RestrictPublicBuckets",
4015
        }
4016
        if not validate_dict_fields(
1✔
4017
            public_access_block_configuration,
4018
            required_fields=set(),
4019
            optional_fields=public_access_block_fields,
4020
        ):
4021
            raise MalformedXML()
×
4022

4023
        for field in public_access_block_fields:
1✔
4024
            if public_access_block_configuration.get(field) is None:
1✔
4025
                public_access_block_configuration[field] = False
1✔
4026

4027
        s3_bucket.public_access_block = public_access_block_configuration
1✔
4028

4029
    def delete_public_access_block(
1✔
4030
        self,
4031
        context: RequestContext,
4032
        bucket: BucketName,
4033
        expected_bucket_owner: AccountId = None,
4034
        **kwargs,
4035
    ) -> None:
4036
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4037

4038
        s3_bucket.public_access_block = None
1✔
4039

4040
    def get_bucket_policy(
1✔
4041
        self,
4042
        context: RequestContext,
4043
        bucket: BucketName,
4044
        expected_bucket_owner: AccountId = None,
4045
        **kwargs,
4046
    ) -> GetBucketPolicyOutput:
4047
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4048
            context, bucket, expected_bucket_owner=expected_bucket_owner
4049
        )
4050
        if not s3_bucket.policy:
1✔
4051
            raise NoSuchBucketPolicy(
1✔
4052
                "The bucket policy does not exist",
4053
                BucketName=bucket,
4054
            )
4055
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
4056

4057
    def put_bucket_policy(
1✔
4058
        self,
4059
        context: RequestContext,
4060
        bucket: BucketName,
4061
        policy: Policy,
4062
        content_md5: ContentMD5 = None,
4063
        checksum_algorithm: ChecksumAlgorithm = None,
4064
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
4065
        expected_bucket_owner: AccountId = None,
4066
        **kwargs,
4067
    ) -> None:
4068
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4069
            context, bucket, expected_bucket_owner=expected_bucket_owner
4070
        )
4071

4072
        if not policy or policy[0] != "{":
1✔
4073
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
4074
        try:
1✔
4075
            json_policy = json.loads(policy)
1✔
4076
            if not json_policy:
1✔
4077
                # TODO: add more validation around the policy?
4078
                raise MalformedPolicy("Missing required field Statement")
1✔
4079
        except ValueError:
1✔
4080
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
4081

4082
        s3_bucket.policy = policy
1✔
4083

4084
    def delete_bucket_policy(
1✔
4085
        self,
4086
        context: RequestContext,
4087
        bucket: BucketName,
4088
        expected_bucket_owner: AccountId = None,
4089
        **kwargs,
4090
    ) -> None:
4091
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4092
            context, bucket, expected_bucket_owner=expected_bucket_owner
4093
        )
4094

4095
        s3_bucket.policy = None
1✔
4096

4097
    def get_bucket_accelerate_configuration(
1✔
4098
        self,
4099
        context: RequestContext,
4100
        bucket: BucketName,
4101
        expected_bucket_owner: AccountId = None,
4102
        request_payer: RequestPayer = None,
4103
        **kwargs,
4104
    ) -> GetBucketAccelerateConfigurationOutput:
4105
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4106

4107
        response = GetBucketAccelerateConfigurationOutput()
1✔
4108
        if s3_bucket.accelerate_status:
1✔
4109
            response["Status"] = s3_bucket.accelerate_status
1✔
4110

4111
        return response
1✔
4112

4113
    def put_bucket_accelerate_configuration(
1✔
4114
        self,
4115
        context: RequestContext,
4116
        bucket: BucketName,
4117
        accelerate_configuration: AccelerateConfiguration,
4118
        expected_bucket_owner: AccountId = None,
4119
        checksum_algorithm: ChecksumAlgorithm = None,
4120
        **kwargs,
4121
    ) -> None:
4122
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4123

4124
        if "." in bucket:
1✔
4125
            raise InvalidRequest(
1✔
4126
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
4127
            )
4128

4129
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
4130
            "Enabled",
4131
            "Suspended",
4132
        ):
4133
            raise MalformedXML()
1✔
4134

4135
        s3_bucket.accelerate_status = status
1✔
4136

4137
    def put_bucket_logging(
1✔
4138
        self,
4139
        context: RequestContext,
4140
        bucket: BucketName,
4141
        bucket_logging_status: BucketLoggingStatus,
4142
        content_md5: ContentMD5 = None,
4143
        checksum_algorithm: ChecksumAlgorithm = None,
4144
        expected_bucket_owner: AccountId = None,
4145
        **kwargs,
4146
    ) -> None:
4147
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4148

4149
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
4150
            s3_bucket.logging = {}
1✔
4151
            return
1✔
4152

4153
        # the target bucket must be in the same account
4154
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
4155
            raise MalformedXML()
×
4156

4157
        if not logging_config.get("TargetPrefix"):
1✔
4158
            logging_config["TargetPrefix"] = ""
×
4159

4160
        # TODO: validate Grants
4161

4162
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
4163
            raise InvalidTargetBucketForLogging(
1✔
4164
                "The target bucket for logging does not exist",
4165
                TargetBucket=target_bucket_name,
4166
            )
4167

4168
        source_bucket_region = s3_bucket.bucket_region
1✔
4169
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
4170
            raise (
1✔
4171
                CrossLocationLoggingProhibitted(
4172
                    "Cross S3 location logging not allowed. ",
4173
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4174
                )
4175
                if source_bucket_region == AWS_REGION_US_EAST_1
4176
                else CrossLocationLoggingProhibitted(
4177
                    "Cross S3 location logging not allowed. ",
4178
                    SourceBucketLocation=source_bucket_region,
4179
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4180
                )
4181
            )
4182

4183
        s3_bucket.logging = logging_config
1✔
4184

4185
    def get_bucket_logging(
1✔
4186
        self,
4187
        context: RequestContext,
4188
        bucket: BucketName,
4189
        expected_bucket_owner: AccountId = None,
4190
        **kwargs,
4191
    ) -> GetBucketLoggingOutput:
4192
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4193

4194
        if not s3_bucket.logging:
1✔
4195
            return GetBucketLoggingOutput()
1✔
4196

4197
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4198

4199
    def put_bucket_replication(
1✔
4200
        self,
4201
        context: RequestContext,
4202
        bucket: BucketName,
4203
        replication_configuration: ReplicationConfiguration,
4204
        content_md5: ContentMD5 = None,
4205
        checksum_algorithm: ChecksumAlgorithm = None,
4206
        token: ObjectLockToken = None,
4207
        expected_bucket_owner: AccountId = None,
4208
        **kwargs,
4209
    ) -> None:
4210
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4211
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4212
            raise InvalidRequest(
1✔
4213
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4214
            )
4215

4216
        if not (rules := replication_configuration.get("Rules")):
1✔
4217
            raise MalformedXML()
1✔
4218

4219
        for rule in rules:
1✔
4220
            if "ID" not in rule:
1✔
4221
                rule["ID"] = short_uid()
1✔
4222

4223
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4224
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4225
            if (
1✔
4226
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4227
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4228
            ):
4229
                # according to AWS testing the same exception is raised if the bucket does not exist
4230
                # or if versioning was disabled
4231
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4232

4233
        # TODO more validation on input
4234
        s3_bucket.replication = replication_configuration
1✔
4235

4236
    def get_bucket_replication(
1✔
4237
        self,
4238
        context: RequestContext,
4239
        bucket: BucketName,
4240
        expected_bucket_owner: AccountId = None,
4241
        **kwargs,
4242
    ) -> GetBucketReplicationOutput:
4243
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4244

4245
        if not s3_bucket.replication:
1✔
4246
            raise ReplicationConfigurationNotFoundError(
1✔
4247
                "The replication configuration was not found",
4248
                BucketName=bucket,
4249
            )
4250

4251
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4252

4253
    def delete_bucket_replication(
1✔
4254
        self,
4255
        context: RequestContext,
4256
        bucket: BucketName,
4257
        expected_bucket_owner: AccountId = None,
4258
        **kwargs,
4259
    ) -> None:
4260
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4261

4262
        s3_bucket.replication = None
1✔
4263

4264
    @handler("PutBucketAcl", expand=False)
1✔
4265
    def put_bucket_acl(
1✔
4266
        self,
4267
        context: RequestContext,
4268
        request: PutBucketAclRequest,
4269
    ) -> None:
4270
        bucket = request["Bucket"]
1✔
4271
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4272
        acp = get_access_control_policy_from_acl_request(
1✔
4273
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4274
        )
4275
        s3_bucket.acl = acp
1✔
4276

4277
    def get_bucket_acl(
1✔
4278
        self,
4279
        context: RequestContext,
4280
        bucket: BucketName,
4281
        expected_bucket_owner: AccountId = None,
4282
        **kwargs,
4283
    ) -> GetBucketAclOutput:
4284
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4285

4286
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4287

4288
    @handler("PutObjectAcl", expand=False)
1✔
4289
    def put_object_acl(
1✔
4290
        self,
4291
        context: RequestContext,
4292
        request: PutObjectAclRequest,
4293
    ) -> PutObjectAclOutput:
4294
        bucket = request["Bucket"]
1✔
4295
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4296

4297
        s3_object = s3_bucket.get_object(
1✔
4298
            key=request["Key"],
4299
            version_id=request.get("VersionId"),
4300
            http_method="PUT",
4301
        )
4302
        acp = get_access_control_policy_from_acl_request(
1✔
4303
            request=request, owner=s3_object.owner, request_body=context.request.data
4304
        )
4305
        previous_acl = s3_object.acl
1✔
4306
        s3_object.acl = acp
1✔
4307

4308
        if previous_acl != acp:
1✔
4309
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4310

4311
        # TODO: RequestCharged
4312
        return PutObjectAclOutput()
1✔
4313

4314
    def get_object_acl(
1✔
4315
        self,
4316
        context: RequestContext,
4317
        bucket: BucketName,
4318
        key: ObjectKey,
4319
        version_id: ObjectVersionId = None,
4320
        request_payer: RequestPayer = None,
4321
        expected_bucket_owner: AccountId = None,
4322
        **kwargs,
4323
    ) -> GetObjectAclOutput:
4324
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4325

4326
        s3_object = s3_bucket.get_object(
1✔
4327
            key=key,
4328
            version_id=version_id,
4329
        )
4330
        # TODO: RequestCharged
4331
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4332

4333
    def get_bucket_policy_status(
1✔
4334
        self,
4335
        context: RequestContext,
4336
        bucket: BucketName,
4337
        expected_bucket_owner: AccountId = None,
4338
        **kwargs,
4339
    ) -> GetBucketPolicyStatusOutput:
4340
        raise NotImplementedError
4341

4342
    def get_object_torrent(
1✔
4343
        self,
4344
        context: RequestContext,
4345
        bucket: BucketName,
4346
        key: ObjectKey,
4347
        request_payer: RequestPayer = None,
4348
        expected_bucket_owner: AccountId = None,
4349
        **kwargs,
4350
    ) -> GetObjectTorrentOutput:
4351
        raise NotImplementedError
4352

4353
    def post_object(
1✔
4354
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4355
    ) -> PostResponse:
4356
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4357
            raise PreconditionFailed(
1✔
4358
                "At least one of the pre-conditions you specified did not hold",
4359
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4360
            )
4361
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4362
        # TODO: signature validation is not implemented for pre-signed POST
4363
        # policy validation is not implemented either, except expiration and mandatory fields
4364
        # This operation is the only one using form for storing the request data. We will have to do some manual
4365
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4366
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4367

4368
        form = context.request.form
1✔
4369
        object_key = context.request.form.get("key")
1✔
4370

4371
        if "file" in form:
1✔
4372
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4373
            file_data = to_bytes(form["file"])
1✔
4374
            object_content_length = len(file_data)
1✔
4375
            stream = BytesIO(file_data)
1✔
4376
        else:
4377
            # this is the default behaviour
4378
            fileobj = context.request.files["file"]
1✔
4379
            stream = fileobj.stream
1✔
4380
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4381
            # validation
4382
            original_pos = stream.tell()
1✔
4383
            object_content_length = stream.seek(0, 2)
1✔
4384
            # reset the stream and put it back at its original position
4385
            stream.seek(original_pos, 0)
1✔
4386

4387
            if "${filename}" in object_key:
1✔
4388
                # TODO: ${filename} is actually usable in all form fields
4389
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4390
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4391
                # is recognized by all form fields.
4392
                object_key = object_key.replace("${filename}", fileobj.filename)
1✔
4393

4394
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4395
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4396
        additional_policy_metadata = {
1✔
4397
            "bucket": bucket,
4398
            "content_length": object_content_length,
4399
        }
4400
        validate_post_policy(form, additional_policy_metadata)
1✔
4401

4402
        if canned_acl := form.get("acl"):
1✔
4403
            validate_canned_acl(canned_acl)
×
4404
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4405
        else:
4406
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4407

4408
        post_system_settable_headers = [
1✔
4409
            "Cache-Control",
4410
            "Content-Type",
4411
            "Content-Disposition",
4412
            "Content-Encoding",
4413
        ]
4414
        system_metadata = {}
1✔
4415
        for system_metadata_field in post_system_settable_headers:
1✔
4416
            if field_value := form.get(system_metadata_field):
1✔
4417
                system_metadata[system_metadata_field.replace("-", "")] = field_value
1✔
4418

4419
        if not system_metadata.get("ContentType"):
1✔
4420
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4421

4422
        user_metadata = {
1✔
4423
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4424
            for field in form
4425
            if field.startswith("x-amz-meta-")
4426
        }
4427

4428
        if tagging := form.get("tagging"):
1✔
4429
            # this is weird, as it's direct XML in the form, we need to parse it directly
4430
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4431

4432
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4433
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4434
        ):
4435
            raise InvalidStorageClass(
1✔
4436
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4437
            )
4438

4439
        encryption_request = {
1✔
4440
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4441
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4442
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4443
        }
4444

4445
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4446
            encryption_request,
4447
            s3_bucket,
4448
            store,
4449
        )
4450

4451
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4452
        checksum_value = (
1✔
4453
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4454
        )
4455
        expires = (
1✔
4456
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4457
        )
4458

4459
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4460

4461
        s3_object = S3Object(
1✔
4462
            key=object_key,
4463
            version_id=version_id,
4464
            storage_class=storage_class,
4465
            expires=expires,
4466
            user_metadata=user_metadata,
4467
            system_metadata=system_metadata,
4468
            checksum_algorithm=checksum_algorithm,
4469
            checksum_value=checksum_value,
4470
            encryption=encryption_parameters.encryption,
4471
            kms_key_id=encryption_parameters.kms_key_id,
4472
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4473
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4474
            acl=acp,
4475
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4476
        )
4477

4478
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4479
            s3_stored_object.write(stream)
1✔
4480

4481
            if not s3_object.checksum_value:
1✔
4482
                s3_object.checksum_value = s3_stored_object.checksum
1✔
4483

4484
            elif checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
×
4485
                self._storage_backend.remove(bucket, s3_object)
×
4486
                raise InvalidRequest(
×
4487
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4488
                )
4489

4490
            s3_bucket.objects.set(object_key, s3_object)
1✔
4491

4492
        # in case we are overriding an object, delete the tags entry
4493
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4494
        store.TAGS.tags.pop(key_id, None)
1✔
4495
        if tagging:
1✔
4496
            store.TAGS.tags[key_id] = tagging
1✔
4497

4498
        response = PostResponse()
1✔
4499
        # hacky way to set the etag in the headers as well: two locations for one value
4500
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4501

4502
        if redirect := form.get("success_action_redirect"):
1✔
4503
            # we need to create the redirect, as the parser could not return the moto-calculated one
4504
            try:
1✔
4505
                redirect = create_redirect_for_post_request(
1✔
4506
                    base_redirect=redirect,
4507
                    bucket=bucket,
4508
                    object_key=object_key,
4509
                    etag=s3_object.quoted_etag,
4510
                )
4511
                response["LocationHeader"] = redirect
1✔
4512
                response["StatusCode"] = 303
1✔
4513
            except ValueError:
1✔
4514
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4515
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4516

4517
        elif status_code := form.get("success_action_status"):
1✔
4518
            response["StatusCode"] = status_code
1✔
4519
        else:
4520
            response["StatusCode"] = 204
1✔
4521

4522
        response["LocationHeader"] = response.get(
1✔
4523
            "LocationHeader", f"{get_full_default_bucket_location(bucket)}{object_key}"
4524
        )
4525

4526
        if s3_bucket.versioning_status == "Enabled":
1✔
4527
            response["VersionId"] = s3_object.version_id
×
4528

4529
        if s3_object.checksum_algorithm:
1✔
4530
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
4531
            response["ChecksumType"] = ChecksumType.FULL_OBJECT
1✔
4532

4533
        if s3_bucket.lifecycle_rules:
1✔
4534
            if expiration_header := self._get_expiration_header(
×
4535
                s3_bucket.lifecycle_rules,
4536
                bucket,
4537
                s3_object,
4538
                store.TAGS.tags.get(key_id, {}),
4539
            ):
4540
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4541
                #  apply them everytime we get/head an object
4542
                response["Expiration"] = expiration_header
×
4543

4544
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4545

4546
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4547

4548
        if response["StatusCode"] == "201":
1✔
4549
            # if the StatusCode is 201, S3 returns an XML body with additional information
4550
            response["ETag"] = s3_object.quoted_etag
1✔
4551
            response["Bucket"] = bucket
1✔
4552
            response["Key"] = object_key
1✔
4553
            response["Location"] = response["LocationHeader"]
1✔
4554

4555
        return response
1✔
4556

4557
    def put_bucket_metrics_configuration(
1✔
4558
        self,
4559
        context: RequestContext,
4560
        bucket: BucketName,
4561
        id: MetricsId,
4562
        metrics_configuration: MetricsConfiguration,
4563
        expected_bucket_owner: AccountId = None,
4564
        **kwargs,
4565
    ) -> None:
4566
        """
4567
        Update or add a new metrics configuration. If the provided `id` already exists, its associated configuration
4568
        will be overwritten. The total number of metric configurations is limited to 1000. If this limit is exceeded,
4569
        an error is raised unless the `is` already exists.
4570

4571
        :param context: The request context.
4572
        :param bucket: The name of the bucket associated with the metrics configuration.
4573
        :param id: Identifies the metrics configuration being added or updated.
4574
        :param metrics_configuration: A new or updated configuration associated with the given metrics identifier.
4575
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4576
        :return: None
4577
        :raises TooManyConfigurations: If the total number of metrics configurations exceeds 1000 AND the provided
4578
            `metrics_id` does not already exist.
4579
        """
4580
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4581
            context, bucket, expected_bucket_owner=expected_bucket_owner
4582
        )
4583

4584
        if (
1✔
4585
            len(s3_bucket.metric_configurations) >= 1000
4586
            and id not in s3_bucket.metric_configurations
4587
        ):
4588
            raise TooManyConfigurations("Too many metrics configurations")
×
4589
        s3_bucket.metric_configurations[id] = metrics_configuration
1✔
4590

4591
    def get_bucket_metrics_configuration(
1✔
4592
        self,
4593
        context: RequestContext,
4594
        bucket: BucketName,
4595
        id: MetricsId,
4596
        expected_bucket_owner: AccountId = None,
4597
        **kwargs,
4598
    ) -> GetBucketMetricsConfigurationOutput:
4599
        """
4600
        Retrieve the metrics configuration associated with a given metrics identifier.
4601

4602
        :param context: The request context.
4603
        :param bucket: The name of the bucket associated with the metrics configuration.
4604
        :param id: The unique identifier of the metrics configuration to retrieve.
4605
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4606
        :return: The metrics configuration associated with the given metrics identifier.
4607
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4608
        """
4609
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4610
            context, bucket, expected_bucket_owner=expected_bucket_owner
4611
        )
4612

4613
        metric_config = s3_bucket.metric_configurations.get(id)
1✔
4614
        if not metric_config:
1✔
4615
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4616
        return GetBucketMetricsConfigurationOutput(MetricsConfiguration=metric_config)
1✔
4617

4618
    def list_bucket_metrics_configurations(
1✔
4619
        self,
4620
        context: RequestContext,
4621
        bucket: BucketName,
4622
        continuation_token: Token = None,
4623
        expected_bucket_owner: AccountId = None,
4624
        **kwargs,
4625
    ) -> ListBucketMetricsConfigurationsOutput:
4626
        """
4627
        Lists the metric configurations available, allowing for pagination using a continuation token to retrieve more
4628
        results.
4629

4630
        :param context: The request context.
4631
        :param bucket: The name of the bucket associated with the metrics configuration.
4632
        :param continuation_token: An optional continuation token to retrieve the next set of results in case there are
4633
            more results than the default limit. Provided as a base64-encoded string value.
4634
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4635
        :return: A list of metric configurations and an optional continuation token for fetching subsequent data, if
4636
            applicable.
4637
        """
4638
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4639
            context, bucket, expected_bucket_owner=expected_bucket_owner
4640
        )
4641

4642
        metrics_configurations: list[MetricsConfiguration] = []
1✔
4643
        next_continuation_token = None
1✔
4644

4645
        decoded_continuation_token = (
1✔
4646
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
4647
            if continuation_token
4648
            else None
4649
        )
4650

4651
        for metric in sorted(s3_bucket.metric_configurations.values(), key=lambda r: r["Id"]):
1✔
4652
            if continuation_token and metric["Id"] < decoded_continuation_token:
1✔
4653
                continue
1✔
4654

4655
            if len(metrics_configurations) >= 100:
1✔
4656
                next_continuation_token = to_str(base64.urlsafe_b64encode(metric["Id"].encode()))
1✔
4657
                break
1✔
4658

4659
            metrics_configurations.append(metric)
1✔
4660

4661
        return ListBucketMetricsConfigurationsOutput(
1✔
4662
            IsTruncated=next_continuation_token is not None,
4663
            ContinuationToken=continuation_token,
4664
            NextContinuationToken=next_continuation_token,
4665
            MetricsConfigurationList=metrics_configurations,
4666
        )
4667

4668
    def delete_bucket_metrics_configuration(
1✔
4669
        self,
4670
        context: RequestContext,
4671
        bucket: BucketName,
4672
        id: MetricsId,
4673
        expected_bucket_owner: AccountId = None,
4674
        **kwargs,
4675
    ) -> None:
4676
        """
4677
        Removes a specific metrics configuration identified by its metrics ID.
4678

4679
        :param context: The request context.
4680
        :param bucket: The name of the bucket associated with the metrics configuration.
4681
        :param id: The unique identifier of the metrics configuration to delete.
4682
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4683
        :return: None
4684
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4685
        """
4686
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4687
            context, bucket, expected_bucket_owner=expected_bucket_owner
4688
        )
4689

4690
        deleted_config = s3_bucket.metric_configurations.pop(id, None)
1✔
4691
        if not deleted_config:
1✔
4692
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4693

4694

4695
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4696
    if not bucket_versioning_status:
1✔
4697
        return None
1✔
4698
    elif bucket_versioning_status.lower() == "enabled":
1✔
4699
        return generate_safe_version_id()
1✔
4700
    else:
4701
        return "null"
1✔
4702

4703

4704
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4705
    if encryption := s3_object.encryption:
1✔
4706
        response["ServerSideEncryption"] = encryption
1✔
4707
        if encryption == ServerSideEncryption.aws_kms:
1✔
4708
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4709
            if s3_object.bucket_key_enabled:
1✔
4710
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4711

4712

4713
def get_encryption_parameters_from_request_and_bucket(
1✔
4714
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4715
    s3_bucket: S3Bucket,
4716
    store: S3Store,
4717
) -> EncryptionParameters:
4718
    if request.get("SSECustomerKey"):
1✔
4719
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4720
        return EncryptionParameters(None, None, False)
1✔
4721

4722
    encryption = request.get("ServerSideEncryption")
1✔
4723
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4724
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4725
    if s3_bucket.encryption_rule:
1✔
4726
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4727
        encryption = (
1✔
4728
            encryption
4729
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4730
        )
4731
        if encryption == ServerSideEncryption.aws_kms:
1✔
4732
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4733
                "ApplyServerSideEncryptionByDefault"
4734
            ].get("KMSMasterKeyID")
4735
            kms_key_id = get_kms_key_arn(
1✔
4736
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4737
            )
4738
            if not kms_key_id:
1✔
4739
                # if not key is provided, AWS will use an AWS managed KMS key
4740
                # create it if it doesn't already exist, and save it in the store per region
4741
                if not store.aws_managed_kms_key_id:
1✔
4742
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4743
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4744
                    )
4745
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4746

4747
                kms_key_id = store.aws_managed_kms_key_id
1✔
4748

4749
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4750

4751

4752
def get_object_lock_parameters_from_bucket_and_request(
1✔
4753
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4754
    s3_bucket: S3Bucket,
4755
):
4756
    lock_mode = request.get("ObjectLockMode")
1✔
4757
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4758
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4759

4760
    if lock_mode and not lock_until:
1✔
4761
        raise InvalidArgument(
1✔
4762
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4763
            ArgumentName="x-amz-object-lock-retain-until-date",
4764
        )
4765
    elif not lock_mode and lock_until:
1✔
4766
        raise InvalidArgument(
1✔
4767
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4768
            ArgumentName="x-amz-object-lock-mode",
4769
        )
4770

4771
    if lock_mode and lock_mode not in OBJECT_LOCK_MODES:
1✔
4772
        raise InvalidArgument(
1✔
4773
            "Unknown wormMode directive.",
4774
            ArgumentName="x-amz-object-lock-mode",
4775
            ArgumentValue=lock_mode,
4776
        )
4777

4778
    if (default_retention := s3_bucket.object_lock_default_retention) and not lock_mode:
1✔
4779
        lock_mode = default_retention["Mode"]
1✔
4780
        lock_until = get_retention_from_now(
1✔
4781
            days=default_retention.get("Days"),
4782
            years=default_retention.get("Years"),
4783
        )
4784

4785
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4786

4787

4788
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4789
    """
4790
    Calculate the range value from a part Number for an S3 Object
4791
    :param s3_object: S3Object
4792
    :param part_number: the wanted part from the S3Object
4793
    :return: an ObjectRange used to return only a slice of an Object
4794
    """
4795
    if not s3_object.parts:
1✔
4796
        if part_number > 1:
1✔
4797
            raise InvalidPartNumber(
1✔
4798
                "The requested partnumber is not satisfiable",
4799
                PartNumberRequested=part_number,
4800
                ActualPartCount=1,
4801
            )
4802
        return ObjectRange(
1✔
4803
            begin=0,
4804
            end=s3_object.size - 1,
4805
            content_length=s3_object.size,
4806
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4807
        )
4808
    elif not (part_data := s3_object.parts.get(part_number)):
1✔
4809
        raise InvalidPartNumber(
1✔
4810
            "The requested partnumber is not satisfiable",
4811
            PartNumberRequested=part_number,
4812
            ActualPartCount=len(s3_object.parts),
4813
        )
4814

4815
    # TODO: remove for next major version 5.0, compatibility for <= 4.5
4816
    if isinstance(part_data, tuple):
1✔
4817
        begin, part_length = part_data
×
4818
    else:
4819
        begin = part_data["_position"]
1✔
4820
        part_length = part_data["Size"]
1✔
4821

4822
    end = begin + part_length - 1
1✔
4823
    return ObjectRange(
1✔
4824
        begin=begin,
4825
        end=end,
4826
        content_length=part_length,
4827
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4828
    )
4829

4830

4831
def get_acl_headers_from_request(
1✔
4832
    request: PutObjectRequest
4833
    | CreateMultipartUploadRequest
4834
    | CopyObjectRequest
4835
    | CreateBucketRequest
4836
    | PutBucketAclRequest
4837
    | PutObjectAclRequest,
4838
) -> list[tuple[str, str]]:
4839
    permission_keys = [
1✔
4840
        "GrantFullControl",
4841
        "GrantRead",
4842
        "GrantReadACP",
4843
        "GrantWrite",
4844
        "GrantWriteACP",
4845
    ]
4846
    acl_headers = [
1✔
4847
        (permission, grant_header)
4848
        for permission in permission_keys
4849
        if (grant_header := request.get(permission))
4850
    ]
4851
    return acl_headers
1✔
4852

4853

4854
def get_access_control_policy_from_acl_request(
1✔
4855
    request: PutBucketAclRequest | PutObjectAclRequest,
4856
    owner: Owner,
4857
    request_body: bytes,
4858
) -> AccessControlPolicy:
4859
    canned_acl = request.get("ACL")
1✔
4860
    acl_headers = get_acl_headers_from_request(request)
1✔
4861

4862
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
4863
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
4864
    # use case seems dangerous
4865
    is_acp_in_body = request_body
1✔
4866

4867
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
4868
        raise MissingSecurityHeader(
1✔
4869
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
4870
        )
4871

4872
    elif canned_acl and acl_headers:
1✔
4873
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
4874

4875
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
4876
        raise UnexpectedContent("This request does not support content")
1✔
4877

4878
    if canned_acl:
1✔
4879
        validate_canned_acl(canned_acl)
1✔
4880
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
4881

4882
    elif acl_headers:
1✔
4883
        grants = []
1✔
4884
        for permission, grantees_values in acl_headers:
1✔
4885
            permission = get_permission_from_header(permission)
1✔
4886
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
4887
            grants.extend(partial_grants)
1✔
4888

4889
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
4890
    else:
4891
        acp = request.get("AccessControlPolicy")
1✔
4892
        validate_acl_acp(acp)
1✔
4893
        if (
1✔
4894
            owner.get("DisplayName")
4895
            and acp["Grants"]
4896
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
4897
        ):
4898
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
1✔
4899

4900
    return acp
1✔
4901

4902

4903
def get_access_control_policy_for_new_resource_request(
1✔
4904
    request: PutObjectRequest
4905
    | CreateMultipartUploadRequest
4906
    | CopyObjectRequest
4907
    | CreateBucketRequest,
4908
    owner: Owner,
4909
) -> AccessControlPolicy:
4910
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
4911
    canned_acl = request.get("ACL")
1✔
4912
    acl_headers = get_acl_headers_from_request(request)
1✔
4913

4914
    if not (canned_acl or acl_headers):
1✔
4915
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
4916

4917
    elif canned_acl and acl_headers:
1✔
4918
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
4919

4920
    if canned_acl:
1✔
4921
        validate_canned_acl(canned_acl)
1✔
4922
        return get_canned_acl(canned_acl, owner=owner)
1✔
4923

4924
    grants = []
×
4925
    for permission, grantees_values in acl_headers:
×
4926
        permission = get_permission_from_header(permission)
×
4927
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
4928
        grants.extend(partial_grants)
×
4929

4930
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
4931

4932

4933
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
4934
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
4935

4936

4937
def verify_object_equality_precondition_write(
1✔
4938
    s3_bucket: S3Bucket,
4939
    key: ObjectKey,
4940
    etag: str,
4941
    initiated: datetime.datetime | None = None,
4942
) -> None:
4943
    existing = s3_bucket.objects.get(key)
1✔
4944
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
4945
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
4946

4947
    if not existing.etag == etag.strip('"'):
1✔
4948
        raise PreconditionFailed(
1✔
4949
            "At least one of the pre-conditions you specified did not hold",
4950
            Condition="If-Match",
4951
        )
4952

4953
    if initiated and initiated < existing.last_modified:
1✔
4954
        raise ConditionalRequestConflict(
1✔
4955
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
4956
            Condition="If-Match",
4957
            Key=key,
4958
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc