• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21656696000

03 Feb 2026 04:15PM UTC coverage: 86.966% (-0.01%) from 86.976%
21656696000

push

github

web-flow
Improve types for the SQS store (#13684)

25 of 25 new or added lines in 3 files covered. (100.0%)

158 existing lines in 7 files now uncovered.

70552 of 81126 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.73
/localstack-core/localstack/services/s3/utils.py
1
import base64
1✔
2
import codecs
1✔
3
import datetime
1✔
4
import hashlib
1✔
5
import itertools
1✔
6
import logging
1✔
7
import re
1✔
8
import time
1✔
9
import zlib
1✔
10
from collections.abc import Mapping
1✔
11
from enum import StrEnum
1✔
12
from secrets import token_bytes
1✔
13
from typing import Any, Literal, NamedTuple, Protocol
1✔
14
from urllib import parse as urlparser
1✔
15
from zoneinfo import ZoneInfo
1✔
16

17
import xmltodict
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.utils import InvalidArnException
1✔
20

21
from localstack import config, constants
1✔
22
from localstack.aws.api import CommonServiceException, RequestContext
1✔
23
from localstack.aws.api.s3 import (
1✔
24
    AccessControlPolicy,
25
    BucketCannedACL,
26
    BucketName,
27
    ChecksumAlgorithm,
28
    ContentMD5,
29
    CopyObjectRequest,
30
    CopySource,
31
    ETag,
32
    GetObjectRequest,
33
    Grant,
34
    Grantee,
35
    HeadObjectRequest,
36
    InvalidArgument,
37
    InvalidLocationConstraint,
38
    InvalidRange,
39
    InvalidTag,
40
    LifecycleExpiration,
41
    LifecycleRule,
42
    LifecycleRules,
43
    Metadata,
44
    ObjectCannedACL,
45
    ObjectKey,
46
    ObjectSize,
47
    ObjectVersionId,
48
    Owner,
49
    Permission,
50
    PreconditionFailed,
51
    PutObjectRequest,
52
    SSEKMSKeyId,
53
    TaggingHeader,
54
    TagSet,
55
    UploadPartCopyRequest,
56
    UploadPartRequest,
57
)
58
from localstack.aws.api.s3 import Type as GranteeType
1✔
59
from localstack.aws.chain import HandlerChain
1✔
60
from localstack.aws.connect import connect_to
1✔
61
from localstack.constants import AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1
1✔
62
from localstack.http import Response
1✔
63
from localstack.services.s3 import checksums
1✔
64
from localstack.services.s3.constants import (
1✔
65
    ALL_USERS_ACL_GRANTEE,
66
    AUTHENTICATED_USERS_ACL_GRANTEE,
67
    BUCKET_LOCATION_CONSTRAINTS,
68
    CHECKSUM_ALGORITHMS,
69
    EU_WEST_1_LOCATION_CONSTRAINTS,
70
    LOG_DELIVERY_ACL_GRANTEE,
71
    SIGNATURE_V2_PARAMS,
72
    SIGNATURE_V4_PARAMS,
73
    SYSTEM_METADATA_SETTABLE_HEADERS,
74
)
75
from localstack.services.s3.exceptions import (
1✔
76
    IllegalLocationConstraintException,
77
    InvalidRequest,
78
    MalformedXML,
79
)
80
from localstack.services.s3.headers import decode_header_rfc2047, encode_header_rfc2047
1✔
81
from localstack.utils.aws import arns
1✔
82
from localstack.utils.aws.arns import parse_arn
1✔
83
from localstack.utils.objects import singleton_factory
1✔
84
from localstack.utils.strings import (
1✔
85
    is_base64,
86
    to_bytes,
87
    to_str,
88
)
89
from localstack.utils.urls import localstack_host
1✔
90

91
LOG = logging.getLogger(__name__)
1✔
92

93
BUCKET_NAME_REGEX = (
1✔
94
    r"(?=^.{3,63}$)(?!^(\d+\.)+\d+$)"
95
    + r"(^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])$)"
96
)
97

98
TAG_REGEX = re.compile(r"^[\w\s.:/=+\-@]*$")
1✔
99

100

101
S3_VIRTUAL_HOSTNAME_REGEX = (
1✔
102
    r"(?P<bucket>.*).s3.(?P<region>(?:us-gov|us|ap|ca|cn|eu|sa)-[a-z]+-\d)?.*"
103
)
104

105
_s3_virtual_host_regex = re.compile(S3_VIRTUAL_HOSTNAME_REGEX)
1✔
106

107

108
RFC1123 = "%a, %d %b %Y %H:%M:%S GMT"
1✔
109
_gmt_zone_info = ZoneInfo("GMT")
1✔
110

111

112
def s3_response_handler(chain: HandlerChain, context: RequestContext, response: Response):
1✔
113
    """
114
    This response handler is taking care of removing certain headers from S3 responses.
115
    We cannot handle this in the serializer, because the serializer handler calls `Response.update_from`, which does
116
    not allow you to remove headers, only add them.
117
    This handler can delete headers from the response.
118
    """
119
    # some requests, for example coming frome extensions, are flagged as S3 requests. This check confirms that it is
120
    # indeed truly an S3 request by checking if it parsed properly as an S3 operation
121
    if not context.service_operation:
1✔
122
        return
1✔
123

124
    # if AWS returns 204, it will not return a body, Content-Length and Content-Type
125
    # the web server is already taking care of deleting the body, but it's more explicit to remove it here
126
    if response.status_code == 204:
1✔
127
        response.data = b""
1✔
128
        response.headers.pop("Content-Type", None)
1✔
129
        response.headers.pop("Content-Length", None)
1✔
130

131
    elif (
1✔
132
        response.status_code == 200
133
        and context.request.method == "PUT"
134
        and response.headers.get("Content-Length") in (0, None)
135
    ):
136
        # AWS does not return a Content-Type if the Content-Length is 0
137
        response.headers.pop("Content-Type", None)
1✔
138

139

140
def get_owner_for_account_id(account_id: str):
1✔
141
    """
142
    This method returns the S3 Owner from the account id. for now, this is hardcoded as it was in moto, but we can then
143
    extend it to return different values depending on the account ID
144
    See https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
145
    :param account_id: the owner account id
146
    :return: the Owner object containing the DisplayName and owner ID
147
    """
148
    return Owner(
1✔
149
        DisplayName="webfile",  # only in certain regions, see above
150
        ID="75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a",
151
    )
152

153

154
def extract_bucket_key_version_id_from_copy_source(
1✔
155
    copy_source: CopySource,
156
) -> tuple[BucketName, ObjectKey, ObjectVersionId | None]:
157
    """
158
    Utility to parse bucket name, object key and optionally its versionId. It accepts the CopySource format:
159
    - <bucket-name/<object-key>?versionId=<version-id>, used for example in CopySource for CopyObject
160
    :param copy_source: the S3 CopySource to parse
161
    :return: parsed BucketName, ObjectKey and optionally VersionId
162
    """
163
    copy_source_parsed = urlparser.urlparse(copy_source)
1✔
164
    # we need to manually replace `+` character with a space character before URL decoding, because different languages
165
    # don't encode their URL the same way (%20 vs +), and Python doesn't unquote + into a space char
166
    parsed_path = urlparser.unquote(copy_source_parsed.path.replace("+", " ")).lstrip("/")
1✔
167

168
    if "/" not in parsed_path:
1✔
169
        raise InvalidArgument(
1✔
170
            "Invalid copy source object key",
171
            ArgumentName="x-amz-copy-source",
172
            ArgumentValue="x-amz-copy-source",
173
        )
174
    src_bucket, src_key = parsed_path.split("/", 1)
1✔
175
    src_version_id = urlparser.parse_qs(copy_source_parsed.query).get("versionId", [None])[0]
1✔
176

177
    return src_bucket, src_key, src_version_id
1✔
178

179

180
class ChecksumHash(Protocol):
1✔
181
    """
182
    This Protocol allows proper typing for different kind of hash used by S3 (hashlib.shaX, zlib.crc32 from
183
    S3CRC32Checksum, and botocore CrtCrc32cChecksum).
184
    """
185

186
    def digest(self) -> bytes: ...
1✔
187

188
    def update(self, value: bytes): ...
1✔
189

190

191
def get_s3_checksum_algorithm_from_request(
1✔
192
    request: PutObjectRequest | UploadPartRequest,
193
) -> ChecksumAlgorithm | None:
194
    checksum_algorithm: list[ChecksumAlgorithm] = [
1✔
195
        algo for algo in CHECKSUM_ALGORITHMS if request.get(f"Checksum{algo}")
196
    ]
197
    if not checksum_algorithm:
1✔
198
        return None
1✔
199

200
    if len(checksum_algorithm) > 1:
1✔
201
        raise InvalidRequest(
1✔
202
            "Expecting a single x-amz-checksum- header. Multiple checksum Types are not allowed."
203
        )
204

205
    return checksum_algorithm[0]
1✔
206

207

208
def get_s3_checksum_algorithm_from_trailing_headers(
1✔
209
    trailing_headers: str,
210
) -> ChecksumAlgorithm | None:
211
    checksum_algorithm: list[ChecksumAlgorithm] = [
1✔
212
        algo for algo in CHECKSUM_ALGORITHMS if f"x-amz-checksum-{algo.lower()}" in trailing_headers
213
    ]
214
    if not checksum_algorithm:
1✔
215
        return None
1✔
216

217
    if len(checksum_algorithm) > 1:
1✔
UNCOV
218
        raise InvalidRequest(
×
219
            "Expecting a single x-amz-checksum- header. Multiple checksum Types are not allowed."
220
        )
221

222
    return checksum_algorithm[0]
1✔
223

224

225
def get_s3_checksum(algorithm) -> ChecksumHash:
1✔
226
    match algorithm:
1✔
227
        case ChecksumAlgorithm.CRC32:
1✔
228
            return S3CRC32Checksum()
1✔
229

230
        case ChecksumAlgorithm.CRC32C:
1✔
231
            from botocore.httpchecksum import CrtCrc32cChecksum
1✔
232

233
            return CrtCrc32cChecksum()
1✔
234

235
        case ChecksumAlgorithm.CRC64NVME:
1✔
236
            from botocore.httpchecksum import CrtCrc64NvmeChecksum
1✔
237

238
            return CrtCrc64NvmeChecksum()
1✔
239

240
        case ChecksumAlgorithm.SHA1:
1✔
241
            return hashlib.sha1(usedforsecurity=False)
1✔
242

243
        case ChecksumAlgorithm.SHA256:
1✔
244
            return hashlib.sha256(usedforsecurity=False)
1✔
245

UNCOV
246
        case _:
×
247
            # TODO: check proper error? for now validated client side, need to check server response
UNCOV
248
            raise InvalidRequest("The value specified in the x-amz-trailer header is not supported")
×
249

250

251
class S3CRC32Checksum:
1✔
252
    """Implements a unified way of using zlib.crc32 compatible with hashlib.sha and botocore CrtCrc32cChecksum"""
253

254
    __slots__ = ["checksum"]
1✔
255

256
    def __init__(self):
1✔
257
        self.checksum = zlib.crc32(b"")
1✔
258

259
    def update(self, value: bytes):
1✔
260
        self.checksum = zlib.crc32(value, self.checksum)
1✔
261

262
    def digest(self) -> bytes:
1✔
263
        return self.checksum.to_bytes(4, "big")
1✔
264

265

266
class CombinedCrcHash:
1✔
267
    def __init__(self, checksum_type: ChecksumAlgorithm):
1✔
268
        match checksum_type:
1✔
269
            case ChecksumAlgorithm.CRC32:
1✔
270
                func = checksums.combine_crc32
1✔
271
            case ChecksumAlgorithm.CRC32C:
1✔
272
                func = checksums.combine_crc32c
1✔
273
            case ChecksumAlgorithm.CRC64NVME:
1✔
274
                func = checksums.combine_crc64_nvme
1✔
275
            case _:
×
UNCOV
276
                raise ValueError("You cannot combine SHA based checksums")
×
277

278
        self.combine_function = func
1✔
279
        self.checksum = b""
1✔
280

281
    def combine(self, value: bytes, object_len: int):
1✔
282
        if not self.checksum:
1✔
283
            self.checksum = value
1✔
284
            return
1✔
285

286
        self.checksum = self.combine_function(self.checksum, value, object_len)
1✔
287

288
    def digest(self):
1✔
289
        return self.checksum
1✔
290

291

292
class ObjectRange(NamedTuple):
1✔
293
    """
294
    NamedTuple representing a parsed Range header with the requested S3 object size
295
    https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range
296
    """
297

298
    content_range: str  # the original Range header
1✔
299
    content_length: int  # the full requested object size
1✔
300
    begin: int  # the start of range
1✔
301
    end: int  # the end of the end
1✔
302

303

304
def parse_range_header(range_header: str, object_size: int) -> ObjectRange | None:
1✔
305
    """
306
    Takes a Range header, and returns a dataclass containing the necessary information to return only a slice of an
307
    S3 object. If the range header is invalid, we return None so that the request is treated as a regular request.
308
    :param range_header: a Range header
309
    :param object_size: the requested S3 object total size
310
    :return: ObjectRange or None if the Range header is invalid
311
    """
312
    last = object_size - 1
1✔
313
    try:
1✔
314
        _, rspec = range_header.split("=")
1✔
315
    except ValueError:
1✔
316
        return None
1✔
317
    if "," in rspec:
1✔
318
        return None
1✔
319

320
    try:
1✔
321
        begin, end = [int(i) if i else None for i in rspec.split("-")]
1✔
322
    except ValueError:
1✔
323
        # if we can't parse the Range header, S3 just treat the request as a non-range request
324
        return None
1✔
325

326
    if (begin is None and end == 0) or (begin is not None and begin > last):
1✔
327
        raise InvalidRange(
1✔
328
            "The requested range is not satisfiable",
329
            ActualObjectSize=str(object_size),
330
            RangeRequested=range_header,
331
        )
332

333
    if begin is not None:  # byte range
1✔
334
        end = last if end is None else min(end, last)
1✔
335
    elif end is not None:  # suffix byte range
1✔
336
        begin = object_size - min(end, object_size)
1✔
337
        end = last
1✔
338
    else:
339
        # Treat as non-range request
340
        return None
1✔
341

342
    if begin > min(end, last):
1✔
343
        # Treat as non-range request if after the logic is applied
344
        return None
1✔
345

346
    return ObjectRange(
1✔
347
        content_range=f"bytes {begin}-{end}/{object_size}",
348
        content_length=end - begin + 1,
349
        begin=begin,
350
        end=end,
351
    )
352

353

354
def parse_copy_source_range_header(copy_source_range: str, object_size: int) -> ObjectRange:
1✔
355
    """
356
    Takes a CopySourceRange parameter, and returns a dataclass containing the necessary information to return only a slice of an
357
    S3 object. The validation is much stricter than `parse_range_header`
358
    :param copy_source_range: a CopySourceRange parameter for UploadCopyPart
359
    :param object_size: the requested S3 object total size
360
    :raises InvalidArgument if the CopySourceRanger parameter does not follow validation
361
    :return: ObjectRange
362
    """
363
    last = object_size - 1
1✔
364
    try:
1✔
365
        _, rspec = copy_source_range.split("=")
1✔
366
    except ValueError:
1✔
367
        raise InvalidArgument(
1✔
368
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
369
            ArgumentName="x-amz-copy-source-range",
370
            ArgumentValue=copy_source_range,
371
        )
372
    if "," in rspec:
1✔
373
        raise InvalidArgument(
1✔
374
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
375
            ArgumentName="x-amz-copy-source-range",
376
            ArgumentValue=copy_source_range,
377
        )
378

379
    try:
1✔
380
        begin, end = [int(i) if i else None for i in rspec.split("-")]
1✔
381
    except ValueError:
1✔
382
        # if we can't parse the Range header, S3 just treat the request as a non-range request
383
        raise InvalidArgument(
1✔
384
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
385
            ArgumentName="x-amz-copy-source-range",
386
            ArgumentValue=copy_source_range,
387
        )
388

389
    if begin is None or end is None or begin > end:
1✔
390
        raise InvalidArgument(
1✔
391
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
392
            ArgumentName="x-amz-copy-source-range",
393
            ArgumentValue=copy_source_range,
394
        )
395

396
    if begin > last:
1✔
397
        # Treat as non-range request if after the logic is applied
398
        raise InvalidRequest(
1✔
399
            "The specified copy range is invalid for the source object size",
400
        )
401
    elif end > last:
1✔
402
        raise InvalidArgument(
1✔
403
            f"Range specified is not valid for source object of size: {object_size}",
404
            ArgumentName="x-amz-copy-source-range",
405
            ArgumentValue=copy_source_range,
406
        )
407

408
    return ObjectRange(
1✔
409
        content_range=f"bytes {begin}-{end}/{object_size}",
410
        content_length=end - begin + 1,
411
        begin=begin,
412
        end=end,
413
    )
414

415

416
def get_failed_upload_part_copy_source_preconditions(
1✔
417
    request: UploadPartCopyRequest, last_modified: datetime.datetime, etag: ETag
418
) -> str | None:
419
    """
420
    Utility which parses the conditions from a S3 UploadPartCopy request.
421
    Note: The order in which these conditions are checked if used in conjunction matters
422

423
    :param UploadPartCopyRequest request: The S3 UploadPartCopy request.
424
    :param datetime last_modified: The time the source object was last modified.
425
    :param ETag etag: The ETag of the source object.
426

427
    :returns: The name of the failed precondition.
428
    """
429
    if_match = request.get("CopySourceIfMatch")
1✔
430
    if_none_match = request.get("CopySourceIfNoneMatch")
1✔
431
    if_unmodified_since = request.get("CopySourceIfUnmodifiedSince")
1✔
432
    if_modified_since = request.get("CopySourceIfModifiedSince")
1✔
433
    last_modified = second_resolution_datetime(last_modified)
1✔
434

435
    if if_match:
1✔
436
        if if_match.strip('"') != etag.strip('"'):
1✔
437
            return "x-amz-copy-source-If-Match"
1✔
438
        if if_modified_since and if_modified_since > last_modified:
1✔
UNCOV
439
            return "x-amz-copy-source-If-Modified-Since"
×
440
        # CopySourceIfMatch is unaffected by CopySourceIfUnmodifiedSince so return early
441
        if if_unmodified_since:
1✔
442
            return None
1✔
443

444
    if if_unmodified_since and second_resolution_datetime(if_unmodified_since) < last_modified:
1✔
445
        return "x-amz-copy-source-If-Unmodified-Since"
1✔
446

447
    if if_none_match and if_none_match.strip('"') == etag.strip('"'):
1✔
448
        return "x-amz-copy-source-If-None-Match"
1✔
449

450
    if if_modified_since and last_modified <= second_resolution_datetime(
1✔
451
        if_modified_since
452
    ) < datetime.datetime.now(tz=_gmt_zone_info):
453
        return "x-amz-copy-source-If-Modified-Since"
1✔
454

455

456
def get_full_default_bucket_location(bucket_name: BucketName) -> str:
1✔
457
    host_definition = localstack_host()
1✔
458
    if host_definition.host != constants.LOCALHOST_HOSTNAME:
1✔
459
        # the user has customised their LocalStack hostname, and may not support subdomains.
460
        # Return the location in path form.
461
        return f"{config.get_protocol()}://{host_definition.host_and_port()}/{bucket_name}/"
1✔
462
    else:
463
        return f"{config.get_protocol()}://{bucket_name}.s3.{host_definition.host_and_port()}/"
1✔
464

465

466
def get_url_encoded_object_location(bucket_name: BucketName, object_key: str) -> str:
1✔
467
    return f"{get_full_default_bucket_location(bucket_name)}{urlparser.quote(object_key)}"
1✔
468

469

470
def etag_to_base_64_content_md5(etag: ETag) -> str:
1✔
471
    """
472
    Convert an ETag, representing a MD5 hexdigest (might be quoted), to its base64 encoded representation
473
    :param etag: an ETag, might be quoted
474
    :return: the base64 value
475
    """
476
    # get the bytes digest from the hexdigest
477
    byte_digest = codecs.decode(to_bytes(etag.strip('"')), "hex")
1✔
478
    return to_str(base64.b64encode(byte_digest))
1✔
479

480

481
def base_64_content_md5_to_etag(content_md5: ContentMD5) -> str | None:
1✔
482
    """
483
    Convert a ContentMD5 header, representing a base64 encoded representation of a MD5 binary digest to its ETag value,
484
    hex encoded
485
    :param content_md5: a ContentMD5 header, base64 encoded
486
    :return: the ETag value, hex coded MD5 digest, or None if the input is not valid b64 or the representation of a MD5
487
    hash
488
    """
489
    if not is_base64(content_md5):
1✔
490
        return None
1✔
491
    # get the hexdigest from the bytes digest
492
    byte_digest = base64.b64decode(content_md5)
1✔
493
    hex_digest = to_str(codecs.encode(byte_digest, "hex"))
1✔
494
    if len(hex_digest) != 32:
1✔
495
        return None
1✔
496

497
    return hex_digest
1✔
498

499

500
def is_presigned_url_request(context: RequestContext) -> bool:
1✔
501
    """
502
    Detects pre-signed URL from query string parameters
503
    Return True if any kind of presigned URL query string parameter is encountered
504
    :param context: the request context from the handler chain
505
    """
506
    # Detecting pre-sign url and checking signature
507
    query_parameters = context.request.args
1✔
508
    return any(p in query_parameters for p in SIGNATURE_V2_PARAMS) or any(
1✔
509
        p in query_parameters for p in SIGNATURE_V4_PARAMS
510
    )
511

512

513
def is_bucket_name_valid(bucket_name: str) -> bool:
1✔
514
    """
515
    ref. https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
516
    """
517
    return True if re.match(BUCKET_NAME_REGEX, bucket_name) else False
1✔
518

519

520
def get_permission_header_name(permission: Permission) -> str:
1✔
521
    return f"x-amz-grant-{permission.replace('_', '-').lower()}"
1✔
522

523

524
def get_permission_from_header(capitalized_field: str) -> Permission:
1✔
525
    headers_parts = [part.upper() for part in re.split(r"([A-Z][a-z]+)", capitalized_field) if part]
1✔
526
    return "_".join(headers_parts[1:])
1✔
527

528

529
def is_valid_canonical_id(canonical_id: str) -> bool:
1✔
530
    """
531
    Validate that the string is a hex string with 64 char
532
    """
533
    try:
1✔
534
        return int(canonical_id, 16) and len(canonical_id) == 64
1✔
535
    except ValueError:
1✔
536
        return False
1✔
537

538

539
def uses_host_addressing(headers: Mapping[str, str]) -> str | None:
1✔
540
    """
541
    Determines if the request is targeting S3 with virtual host addressing
542
    :param headers: the request headers
543
    :return: if the request targets S3 with virtual host addressing, returns the bucket name else None
544
    """
545
    host = headers.get("host", "")
1✔
546

547
    # try to extract the bucket from the hostname (the "in" check is a minor optimization, as the regex is very greedy)
548
    if ".s3." in host and (
1✔
549
        (match := _s3_virtual_host_regex.match(host)) and (bucket_name := match.group("bucket"))
550
    ):
551
        return bucket_name
1✔
552

553

554
def get_class_attrs_from_spec_class(spec_class: type[StrEnum]) -> set[str]:
1✔
555
    return {str(spec) for spec in spec_class}
1✔
556

557

558
def get_system_metadata_from_request(request: dict) -> Metadata:
1✔
559
    metadata: Metadata = {}
1✔
560

561
    for system_metadata_field in SYSTEM_METADATA_SETTABLE_HEADERS:
1✔
562
        if field_value := request.get(system_metadata_field):
1✔
563
            metadata[system_metadata_field] = field_value
1✔
564

565
    return metadata
1✔
566

567

568
def encode_user_metadata(metadata: Metadata) -> Metadata:
1✔
569
    """Encode the user metadata in the RFC 2047 format if necessary so that it can be returned in HTTP headers"""
570
    return {k: encode_header_rfc2047(v) for k, v in metadata.items()}
1✔
571

572

573
def decode_user_metadata(metadata: Metadata | None) -> Metadata:
1✔
574
    """Decode the user metadata if provided in the RFC2047 format, or leave as is if not. AWS also lowercase the
575
    metadata key"""
576
    if not metadata:
1✔
577
        return {}
1✔
578

579
    return {k.lower(): decode_header_rfc2047(v) for k, v in metadata.items()}
1✔
580

581

582
def extract_bucket_name_and_key_from_headers_and_path(
1✔
583
    headers: dict[str, str], path: str
584
) -> tuple[str | None, str | None]:
585
    """
586
    Extract the bucket name and the object key from a request headers and path. This works with both virtual host
587
    and path style requests.
588
    :param headers: the request headers, used to get the Host
589
    :param path: the request path
590
    :return: if found, the bucket name and object key
591
    """
592
    bucket_name = None
1✔
593
    object_key = None
1✔
594
    host = headers.get("host", "")
1✔
595
    if ".s3" in host:
1✔
596
        vhost_match = _s3_virtual_host_regex.match(host)
1✔
597
        if vhost_match and vhost_match.group("bucket"):
1✔
598
            bucket_name = vhost_match.group("bucket") or None
1✔
599
            split = path.split("/", maxsplit=1)
1✔
600
            if len(split) > 1 and split[1]:
1✔
601
                object_key = split[1]
1✔
602
    else:
603
        path_without_params = path.partition("?")[0]
1✔
604
        split = path_without_params.split("/", maxsplit=2)
1✔
605
        bucket_name = split[1] or None
1✔
606
        if len(split) > 2:
1✔
607
            object_key = split[2]
1✔
608

609
    return bucket_name, object_key
1✔
610

611

612
def normalize_bucket_name(bucket_name):
1✔
613
    bucket_name = bucket_name or ""
1✔
614
    bucket_name = bucket_name.lower()
1✔
615
    return bucket_name
1✔
616

617

618
def get_bucket_and_key_from_s3_uri(s3_uri: str) -> tuple[str, str]:
1✔
619
    """
620
    Extracts the bucket name and key from s3 uri
621
    """
622
    output_bucket, _, output_key = s3_uri.removeprefix("s3://").partition("/")
1✔
623
    return output_bucket, output_key
1✔
624

625

626
def get_bucket_and_key_from_presign_url(presign_url: str) -> tuple[str, str]:
1✔
627
    """
628
    Extracts the bucket name and key from s3 presign url
629
    """
630
    parsed_url = urlparser.urlparse(presign_url)
1✔
631
    bucket = parsed_url.path.split("/")[1]
1✔
632
    key = "/".join(parsed_url.path.split("/")[2:]).split("?")[0]
1✔
633
    return bucket, key
1✔
634

635

636
def capitalize_header_name_from_snake_case(header_name: str) -> str:
1✔
637
    return "-".join([part.capitalize() for part in header_name.split("-")])
1✔
638

639

640
def header_name_from_capitalized_param(param_name: str) -> str:
1✔
641
    return "-".join(re.findall("[A-Z][^A-Z]*", param_name)).lower()
1✔
642

643

644
def get_kms_key_arn(kms_key: str, account_id: str, bucket_region: str) -> str | None:
1✔
645
    """
646
    In S3, the KMS key can be passed as a KeyId or a KeyArn. This method allows to always get the KeyArn from either.
647
    It can also validate if the key is in the same region, and raise an exception.
648
    :param kms_key: the KMS key id or ARN
649
    :param account_id: the bucket account id
650
    :param bucket_region: the bucket region
651
    :raise KMS.NotFoundException if the key is not in the same region
652
    :return: the key ARN if found and enabled
653
    """
654
    if not kms_key:
1✔
655
        return None
1✔
656
    try:
1✔
657
        parsed_arn = parse_arn(kms_key)
1✔
658
        key_region = parsed_arn["region"]
1✔
659
        # the KMS key should be in the same region as the bucket, we can raise an exception without calling KMS
660
        if bucket_region and key_region != bucket_region:
1✔
661
            raise CommonServiceException(
1✔
662
                code="KMS.NotFoundException", message=f"Invalid arn {key_region}"
663
            )
664

665
    except InvalidArnException:
1✔
666
        # if it fails, the passed ID is a UUID with no region data
667
        key_id = kms_key
1✔
668
        # recreate the ARN manually with the bucket region and bucket owner
669
        # if the KMS key is cross-account, user should provide an ARN and not a KeyId
670
        kms_key = arns.kms_key_arn(key_id=key_id, account_id=account_id, region_name=bucket_region)
1✔
671

672
    return kms_key
1✔
673

674

675
# TODO: replace Any by a replacement for S3Bucket, some kind of defined type?
676
def validate_kms_key_id(kms_key: str, bucket: Any) -> None:
1✔
677
    """
678
    Validate that the KMS key used to encrypt the object is valid
679
    :param kms_key: the KMS key id or ARN
680
    :param bucket: the targeted bucket
681
    :raise KMS.DisabledException if the key is disabled
682
    :raise KMS.NotFoundException if the key is not in the same region or does not exist
683
    :return: the key ARN if found and enabled
684
    """
685
    if hasattr(bucket, "region_name"):
1✔
UNCOV
686
        bucket_region = bucket.region_name
×
687
    else:
688
        bucket_region = bucket.bucket_region
1✔
689

690
    if hasattr(bucket, "account_id"):
1✔
UNCOV
691
        bucket_account_id = bucket.account_id
×
692
    else:
693
        bucket_account_id = bucket.bucket_account_id
1✔
694

695
    kms_key_arn = get_kms_key_arn(kms_key, bucket_account_id, bucket_region)
1✔
696

697
    # the KMS key should be in the same region as the bucket, create the client in the bucket region
698
    kms_client = connect_to(region_name=bucket_region).kms
1✔
699
    try:
1✔
700
        key = kms_client.describe_key(KeyId=kms_key_arn)
1✔
701
        if not key["KeyMetadata"]["Enabled"]:
1✔
702
            if key["KeyMetadata"]["KeyState"] == "PendingDeletion":
1✔
703
                raise CommonServiceException(
1✔
704
                    code="KMS.KMSInvalidStateException",
705
                    message=f"{key['KeyMetadata']['Arn']} is pending deletion.",
706
                )
707
            raise CommonServiceException(
1✔
708
                code="KMS.DisabledException", message=f"{key['KeyMetadata']['Arn']} is disabled."
709
            )
710

711
    except ClientError as e:
1✔
712
        if e.response["Error"]["Code"] == "NotFoundException":
1✔
713
            raise CommonServiceException(
1✔
714
                code="KMS.NotFoundException", message=e.response["Error"]["Message"]
715
            )
UNCOV
716
        raise
×
717

718

719
def create_s3_kms_managed_key_for_region(account_id: str, region_name: str) -> SSEKMSKeyId:
1✔
720
    kms_client = connect_to(aws_access_key_id=account_id, region_name=region_name).kms
1✔
721
    key = kms_client.create_key(
1✔
722
        Description="Default key that protects my S3 objects when no other key is defined"
723
    )
724

725
    return key["KeyMetadata"]["Arn"]
1✔
726

727

728
def rfc_1123_datetime(src: datetime.datetime) -> str:
1✔
729
    return src.strftime(RFC1123)
1✔
730

731

732
def str_to_rfc_1123_datetime(value: str) -> datetime.datetime:
1✔
733
    return datetime.datetime.strptime(value, RFC1123).replace(tzinfo=_gmt_zone_info)
1✔
734

735

736
def second_resolution_datetime(src: datetime.datetime) -> datetime.datetime:
1✔
737
    return src.replace(microsecond=0)
1✔
738

739

740
def add_expiration_days_to_datetime(user_datatime: datetime.datetime, exp_days: int) -> str:
1✔
741
    """
742
    This adds expiration days to a datetime, rounding to the next day at midnight UTC.
743
    :param user_datatime: datetime object
744
    :param exp_days: provided days
745
    :return: return a datetime object, rounded to midnight, in string formatted to rfc_1123
746
    """
747
    rounded_datetime = user_datatime.replace(
1✔
748
        hour=0, minute=0, second=0, microsecond=0
749
    ) + datetime.timedelta(days=exp_days + 1)
750

751
    return rfc_1123_datetime(rounded_datetime)
1✔
752

753

754
def serialize_expiration_header(
1✔
755
    rule_id: str, lifecycle_exp: LifecycleExpiration, last_modified: datetime.datetime
756
):
757
    if exp_days := lifecycle_exp.get("Days"):
1✔
758
        # AWS round to the next day at midnight UTC
759
        exp_date = add_expiration_days_to_datetime(last_modified, exp_days)
1✔
760
    else:
761
        exp_date = rfc_1123_datetime(lifecycle_exp["Date"])
1✔
762

763
    return f'expiry-date="{exp_date}", rule-id="{rule_id}"'
1✔
764

765

766
def get_lifecycle_rule_from_object(
1✔
767
    lifecycle_conf_rules: LifecycleRules,
768
    object_key: ObjectKey,
769
    size: ObjectSize,
770
    object_tags: dict[str, str],
771
) -> LifecycleRule:
772
    for rule in lifecycle_conf_rules:
1✔
773
        if not (expiration := rule.get("Expiration")) or "ExpiredObjectDeleteMarker" in expiration:
1✔
774
            continue
1✔
775

776
        if not (rule_filter := rule.get("Filter")):
1✔
777
            return rule
1✔
778

779
        if and_rules := rule_filter.get("And"):
1✔
780
            if all(
1✔
781
                _match_lifecycle_filter(key, value, object_key, size, object_tags)
782
                for key, value in and_rules.items()
783
            ):
UNCOV
784
                return rule
×
785

786
        if any(
1✔
787
            _match_lifecycle_filter(key, value, object_key, size, object_tags)
788
            for key, value in rule_filter.items()
789
        ):
790
            # after validation, we can only one of `Prefix`, `Tag`, `ObjectSizeGreaterThan` or `ObjectSizeLessThan` in
791
            # the dict. Instead of manually checking, we can iterate of the only key and try to match it
792
            return rule
1✔
793

794

795
def _match_lifecycle_filter(
1✔
796
    filter_key: str,
797
    filter_value: str | int | dict[str, str],
798
    object_key: ObjectKey,
799
    size: ObjectSize,
800
    object_tags: dict[str, str],
801
):
802
    match filter_key:
1✔
803
        case "Prefix":
1✔
804
            return object_key.startswith(filter_value)
1✔
805
        case "Tag":
1✔
806
            return object_tags and object_tags.get(filter_value.get("Key")) == filter_value.get(
1✔
807
                "Value"
808
            )
809
        case "ObjectSizeGreaterThan":
1✔
810
            return size > filter_value
1✔
811
        case "ObjectSizeLessThan":
1✔
812
            return size < filter_value
1✔
813
        case "Tags":  # this is inside the `And` field
1✔
814
            return object_tags and all(
1✔
815
                object_tags.get(tag.get("Key")) == tag.get("Value") for tag in filter_value
816
            )
817

818

819
def parse_expiration_header(
1✔
820
    expiration_header: str,
821
) -> tuple[datetime.datetime | None, str | None]:
822
    try:
1✔
823
        header_values = dict(
1✔
824
            (p.strip('"') for p in v.split("=")) for v in expiration_header.split('", ')
825
        )
826
        expiration_date = str_to_rfc_1123_datetime(header_values["expiry-date"])
1✔
827
        return expiration_date, header_values["rule-id"]
1✔
828

829
    except (IndexError, ValueError, KeyError):
1✔
830
        return None, None
1✔
831

832

833
def validate_dict_fields(data: dict, required_fields: set, optional_fields: set = None):
1✔
834
    """
835
    Validate whether the `data` dict contains at least the required fields and not more than the union of the required
836
    and optional fields
837
    TODO: we could pass the TypedDict to also use its required/optional properties, but it could be sensitive to
838
     mistake/changes in the specs and not always right
839
    :param data: the dict we want to validate
840
    :param required_fields: a set containing the required fields
841
    :param optional_fields: a set containing the optional fields
842
    :return: bool, whether the dict is valid or not
843
    """
844
    if optional_fields is None:
1✔
845
        optional_fields = set()
1✔
846
    return (set_fields := set(data)) >= required_fields and set_fields <= (
1✔
847
        required_fields | optional_fields
848
    )
849

850

851
def parse_tagging_header(tagging_header: TaggingHeader) -> dict:
1✔
852
    try:
1✔
853
        parsed_tags = urlparser.parse_qs(tagging_header, keep_blank_values=True)
1✔
854
        tags: dict[str, str] = {}
1✔
855
        for key, val in parsed_tags.items():
1✔
856
            if len(val) != 1 or not TAG_REGEX.match(key) or not TAG_REGEX.match(val[0]):
1✔
857
                raise InvalidArgument(
1✔
858
                    "The header 'x-amz-tagging' shall be encoded as UTF-8 then URLEncoded URL query parameters without tag name duplicates.",
859
                    ArgumentName="x-amz-tagging",
860
                    ArgumentValue=tagging_header,
861
                )
862
            elif key.startswith("aws:"):
1✔
UNCOV
863
                raise
×
864
            tags[key] = val[0]
1✔
865
        return tags
1✔
866

867
    except ValueError:
1✔
UNCOV
868
        raise InvalidArgument(
×
869
            "The header 'x-amz-tagging' shall be encoded as UTF-8 then URLEncoded URL query parameters without tag name duplicates.",
870
            ArgumentName="x-amz-tagging",
871
            ArgumentValue=tagging_header,
872
        )
873

874

875
def validate_tag_set(
1✔
876
    tag_set: TagSet, type_set: Literal["bucket", "object", "create-bucket"] = "bucket"
877
):
878
    keys = set()
1✔
879
    for tag in tag_set:
1✔
880
        if set(tag) != {"Key", "Value"}:
1✔
UNCOV
881
            raise MalformedXML()
×
882

883
        key = tag["Key"]
1✔
884
        value = tag["Value"]
1✔
885

886
        if key is None or value is None:
1✔
887
            raise MalformedXML()
1✔
888

889
        if key in keys:
1✔
890
            raise InvalidTag(
1✔
891
                "Cannot provide multiple Tags with the same key",
892
                TagKey=key,
893
            )
894

895
        if key.startswith("aws:"):
1✔
896
            if type_set == "bucket":
1✔
897
                message = "System tags cannot be added/updated by requester"
1✔
898
            elif type_set == "object":
1✔
899
                message = "Your TagKey cannot be prefixed with aws:"
1✔
900
            else:
901
                message = 'User-defined tag keys can\'t start with "aws:". This prefix is reserved for system tags. Remove "aws:" from your tag keys and try again.'
1✔
902
            raise InvalidTag(
1✔
903
                message,
904
                # weirdly, AWS does not return the `TagKey` field here, but it does if the TagKey does not match the
905
                # regex in the next step
906
                TagKey=key if type_set != "create-bucket" else None,
907
            )
908

909
        if not TAG_REGEX.match(key):
1✔
910
            raise InvalidTag(
1✔
911
                "The TagKey you have provided is invalid",
912
                TagKey=key,
913
            )
914
        elif not TAG_REGEX.match(value):
1✔
915
            raise InvalidTag(
1✔
916
                "The TagValue you have provided is invalid", TagKey=key, TagValue=value
917
            )
918

919
        keys.add(key)
1✔
920

921

922
def validate_location_constraint(context_region: str, location_constraint: str) -> None:
1✔
923
    if location_constraint:
1✔
924
        if context_region == AWS_REGION_US_EAST_1:
1✔
925
            if (
1✔
926
                not config.ALLOW_NONSTANDARD_REGIONS
927
                and location_constraint not in BUCKET_LOCATION_CONSTRAINTS
928
            ):
929
                raise InvalidLocationConstraint(
1✔
930
                    "The specified location-constraint is not valid",
931
                    LocationConstraint=location_constraint,
932
                )
933
        elif context_region == AWS_REGION_EU_WEST_1:
1✔
934
            if location_constraint not in EU_WEST_1_LOCATION_CONSTRAINTS:
1✔
935
                raise IllegalLocationConstraintException(location_constraint)
1✔
936
        elif context_region != location_constraint:
1✔
937
            raise IllegalLocationConstraintException(location_constraint)
1✔
938
    else:
939
        if context_region != AWS_REGION_US_EAST_1:
1✔
940
            raise IllegalLocationConstraintException("unspecified")
1✔
941

942

943
def get_unique_key_id(
1✔
944
    bucket: BucketName, object_key: ObjectKey, version_id: ObjectVersionId
945
) -> str:
946
    return f"{bucket}/{object_key}/{version_id or 'null'}"
1✔
947

948

949
def get_retention_from_now(days: int = None, years: int = None) -> datetime.datetime:
1✔
950
    """
951
    This calculates a retention date from now, adding days or years to it
952
    :param days: provided days
953
    :param years: provided years, exclusive with days
954
    :return: return a datetime object
955
    """
956
    if not days and not years:
1✔
UNCOV
957
        raise ValueError("Either 'days' or 'years' needs to be provided")
×
958
    now = datetime.datetime.now(tz=_gmt_zone_info)
1✔
959
    if days:
1✔
960
        retention = now + datetime.timedelta(days=days)
1✔
961
    else:
UNCOV
962
        retention = now.replace(year=now.year + years)
×
963

964
    return retention
1✔
965

966

967
def get_failed_precondition_copy_source(
1✔
968
    request: CopyObjectRequest, last_modified: datetime.datetime, etag: ETag
969
) -> str | None:
970
    """
971
    Validate if the source object LastModified and ETag matches a precondition, and if it does, return the failed
972
    precondition
973
    # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
974
    :param request: the CopyObjectRequest
975
    :param last_modified: source object LastModified
976
    :param etag: source object ETag
977
    :return str: the failed precondition to raise
978
    """
979
    last_modified = second_resolution_datetime(last_modified)
1✔
980
    if (cs_if_match := request.get("CopySourceIfMatch")) and etag.strip('"') != cs_if_match.strip(
1✔
981
        '"'
982
    ):
983
        return "x-amz-copy-source-If-Match"
1✔
984

985
    elif (
1✔
986
        cs_if_unmodified_since := request.get("CopySourceIfUnmodifiedSince")
987
    ) and last_modified > second_resolution_datetime(cs_if_unmodified_since):
988
        return "x-amz-copy-source-If-Unmodified-Since"
1✔
989

990
    elif (cs_if_none_match := request.get("CopySourceIfNoneMatch")) and etag.strip(
1✔
991
        '"'
992
    ) == cs_if_none_match.strip('"'):
993
        return "x-amz-copy-source-If-None-Match"
1✔
994

995
    elif (
1✔
996
        cs_if_modified_since := request.get("CopySourceIfModifiedSince")
997
    ) and last_modified <= second_resolution_datetime(cs_if_modified_since) < datetime.datetime.now(
998
        tz=_gmt_zone_info
999
    ):
1000
        return "x-amz-copy-source-If-Modified-Since"
1✔
1001

1002

1003
def validate_failed_precondition(
1✔
1004
    request: GetObjectRequest | HeadObjectRequest, last_modified: datetime.datetime, etag: ETag
1005
) -> None:
1006
    """
1007
    Validate if the object LastModified and ETag matches a precondition, and if it does, return the failed
1008
    precondition
1009
    :param request: the GetObjectRequest or HeadObjectRequest
1010
    :param last_modified: S3 object LastModified
1011
    :param etag: S3 object ETag
1012
    :raises PreconditionFailed
1013
    :raises NotModified, 304 with an empty body
1014
    """
1015
    precondition_failed = None
1✔
1016
    # last_modified needs to be rounded to a second so that strict equality can be enforced from a RFC1123 header
1017
    last_modified = second_resolution_datetime(last_modified)
1✔
1018
    if (if_match := request.get("IfMatch")) and etag != if_match.strip('"'):
1✔
1019
        precondition_failed = "If-Match"
1✔
1020

1021
    elif (
1✔
1022
        if_unmodified_since := request.get("IfUnmodifiedSince")
1023
    ) and last_modified > second_resolution_datetime(if_unmodified_since):
1024
        precondition_failed = "If-Unmodified-Since"
1✔
1025

1026
    if precondition_failed:
1✔
1027
        raise PreconditionFailed(
1✔
1028
            "At least one of the pre-conditions you specified did not hold",
1029
            Condition=precondition_failed,
1030
        )
1031

1032
    if ((if_none_match := request.get("IfNoneMatch")) and etag == if_none_match.strip('"')) or (
1✔
1033
        (if_modified_since := request.get("IfModifiedSince"))
1034
        and last_modified
1035
        <= second_resolution_datetime(if_modified_since)
1036
        < datetime.datetime.now(tz=_gmt_zone_info)
1037
    ):
1038
        raise CommonServiceException(
1✔
1039
            message="Not Modified",
1040
            code="NotModified",
1041
            status_code=304,
1042
        )
1043

1044

1045
def get_canned_acl(
1✔
1046
    canned_acl: BucketCannedACL | ObjectCannedACL, owner: Owner
1047
) -> AccessControlPolicy:
1048
    """
1049
    Return the proper Owner and Grants from a CannedACL
1050
    See https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
1051
    :param canned_acl: an S3 CannedACL
1052
    :param owner: the current owner of the bucket or object
1053
    :return: an AccessControlPolicy containing the Grants and Owner
1054
    """
1055
    owner_grantee = Grantee(**owner, Type=GranteeType.CanonicalUser)
1✔
1056
    grants = [Grant(Grantee=owner_grantee, Permission=Permission.FULL_CONTROL)]
1✔
1057

1058
    match canned_acl:
1✔
1059
        case ObjectCannedACL.private:
1✔
1060
            pass  # no other permissions
1✔
1061
        case ObjectCannedACL.public_read:
1✔
1062
            grants.append(Grant(Grantee=ALL_USERS_ACL_GRANTEE, Permission=Permission.READ))
1✔
1063

1064
        case ObjectCannedACL.public_read_write:
1✔
1065
            grants.append(Grant(Grantee=ALL_USERS_ACL_GRANTEE, Permission=Permission.READ))
1✔
1066
            grants.append(Grant(Grantee=ALL_USERS_ACL_GRANTEE, Permission=Permission.WRITE))
1✔
UNCOV
1067
        case ObjectCannedACL.authenticated_read:
×
UNCOV
1068
            grants.append(
×
1069
                Grant(Grantee=AUTHENTICATED_USERS_ACL_GRANTEE, Permission=Permission.READ)
1070
            )
UNCOV
1071
        case ObjectCannedACL.bucket_owner_read:
×
UNCOV
1072
            pass  # TODO: bucket owner ACL
×
UNCOV
1073
        case ObjectCannedACL.bucket_owner_full_control:
×
UNCOV
1074
            pass  # TODO: bucket owner ACL
×
UNCOV
1075
        case ObjectCannedACL.aws_exec_read:
×
UNCOV
1076
            pass  # TODO: bucket owner, EC2 Read
×
UNCOV
1077
        case BucketCannedACL.log_delivery_write:
×
UNCOV
1078
            grants.append(Grant(Grantee=LOG_DELIVERY_ACL_GRANTEE, Permission=Permission.READ_ACP))
×
UNCOV
1079
            grants.append(Grant(Grantee=LOG_DELIVERY_ACL_GRANTEE, Permission=Permission.WRITE))
×
1080

1081
    return AccessControlPolicy(Owner=owner, Grants=grants)
1✔
1082

1083

1084
def create_redirect_for_post_request(
1✔
1085
    base_redirect: str, bucket: BucketName, object_key: ObjectKey, etag: ETag
1086
):
1087
    """
1088
    POST requests can redirect if successful. It will take the URL provided and append query string parameters
1089
    (key, bucket and ETag). It needs to be a full URL.
1090
    :param base_redirect: the URL provided for redirection
1091
    :param bucket: bucket name
1092
    :param object_key: object key
1093
    :param etag: key ETag
1094
    :return: the URL provided with the new appended query string parameters
1095
    """
1096
    parts = urlparser.urlparse(base_redirect)
1✔
1097
    if not parts.netloc:
1✔
1098
        raise ValueError("The provided URL is not valid")
1✔
1099
    queryargs = urlparser.parse_qs(parts.query)
1✔
1100
    queryargs["key"] = [object_key]
1✔
1101
    queryargs["bucket"] = [bucket]
1✔
1102
    queryargs["etag"] = [etag]
1✔
1103
    redirect_queryargs = urlparser.urlencode(queryargs, doseq=True)
1✔
1104
    newparts = (
1✔
1105
        parts.scheme,
1106
        parts.netloc,
1107
        parts.path,
1108
        parts.params,
1109
        redirect_queryargs,
1110
        parts.fragment,
1111
    )
1112
    return urlparser.urlunparse(newparts)
1✔
1113

1114

1115
def parse_post_object_tagging_xml(tagging: str) -> dict | None:
1✔
1116
    try:
1✔
1117
        tag_set = {}
1✔
1118
        tags = xmltodict.parse(tagging)
1✔
1119
        xml_tags = tags.get("Tagging", {}).get("TagSet", {}).get("Tag", [])
1✔
1120
        if not xml_tags:
1✔
1121
            # if the Tagging does not respect the schema, just return
1122
            return
1✔
1123
        if not isinstance(xml_tags, list):
1✔
1124
            xml_tags = [xml_tags]
1✔
1125
        for tag in xml_tags:
1✔
1126
            tag_set[tag["Key"]] = tag["Value"]
1✔
1127

1128
        return tag_set
1✔
1129

1130
    except Exception:
1✔
1131
        raise MalformedXML()
1✔
1132

1133

1134
def generate_safe_version_id() -> str:
1✔
1135
    """
1136
    Generate a safe version id for XML rendering.
1137
    VersionId cannot have `-` in it, as it fails in XML
1138
    Combine an ever-increasing part in the 8 first characters, and a random element.
1139
    We need the sequence part in order to properly implement pagination around ListObjectVersions.
1140
    By prefixing the version-id with a global increasing number, we can sort the versions
1141
    :return: an S3 VersionId containing a timestamp part in the first 8 characters
1142
    """
1143
    tok = next(global_version_id_sequence()).to_bytes(length=6) + token_bytes(18)
1✔
1144
    return base64.b64encode(tok, altchars=b"._").rstrip(b"=").decode("ascii")
1✔
1145

1146

1147
@singleton_factory
1✔
1148
def global_version_id_sequence():
1✔
1149
    start = int(time.time() * 1000)
1✔
1150
    # itertools.count is thread safe over the GIL since its getAndIncrement operation is a single python bytecode op
1151
    return itertools.count(start)
1✔
1152

1153

1154
def is_version_older_than_other(version_id: str, other: str):
1✔
1155
    """
1156
    Compare the sequence part of a VersionId against the sequence part of a VersionIdMarker. Used for pagination
1157
    See `generate_safe_version_id`
1158
    """
1159
    return base64.b64decode(version_id, altchars=b"._") < base64.b64decode(other, altchars=b"._")
1✔
1160

1161

1162
def get_bucket_location_xml(location_constraint: str) -> str:
1✔
1163
    """
1164
    Returns the formatted XML for the GetBucketLocation operation.
1165

1166
    :param location_constraint: The location constraint to return in the XML. It can be an empty string when
1167
    it's not specified in the bucket configuration.
1168
    :return: The XML response.
1169
    """
1170

1171
    return (
1✔
1172
        '<?xml version="1.0" encoding="UTF-8"?>\n'
1173
        '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/"'
1174
        + ("/>" if not location_constraint else f">{location_constraint}</LocationConstraint>")
1175
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc