• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19950811432

04 Dec 2025 07:06PM UTC coverage: 86.904% (+0.007%) from 86.897%
19950811432

push

github

web-flow
Lambda: Keep track of scaling config in store (#13463)

1 of 1 new or added line in 1 file covered. (100.0%)

165 existing lines in 7 files now uncovered.

69796 of 80314 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.61
/localstack-core/localstack/services/s3/utils.py
1
import base64
1✔
2
import codecs
1✔
3
import datetime
1✔
4
import hashlib
1✔
5
import itertools
1✔
6
import logging
1✔
7
import re
1✔
8
import time
1✔
9
import zlib
1✔
10
from collections.abc import Mapping
1✔
11
from enum import StrEnum
1✔
12
from secrets import token_bytes
1✔
13
from typing import Any, Literal, NamedTuple, Protocol
1✔
14
from urllib import parse as urlparser
1✔
15
from zoneinfo import ZoneInfo
1✔
16

17
import xmltodict
1✔
18
from botocore.exceptions import ClientError
1✔
19
from botocore.utils import InvalidArnException
1✔
20

21
from localstack import config, constants
1✔
22
from localstack.aws.api import CommonServiceException, RequestContext
1✔
23
from localstack.aws.api.s3 import (
1✔
24
    AccessControlPolicy,
25
    BucketCannedACL,
26
    BucketName,
27
    ChecksumAlgorithm,
28
    ContentMD5,
29
    CopyObjectRequest,
30
    CopySource,
31
    ETag,
32
    GetObjectRequest,
33
    Grant,
34
    Grantee,
35
    HeadObjectRequest,
36
    InvalidArgument,
37
    InvalidLocationConstraint,
38
    InvalidRange,
39
    InvalidTag,
40
    LifecycleExpiration,
41
    LifecycleRule,
42
    LifecycleRules,
43
    Metadata,
44
    ObjectCannedACL,
45
    ObjectKey,
46
    ObjectSize,
47
    ObjectVersionId,
48
    Owner,
49
    Permission,
50
    PreconditionFailed,
51
    PutObjectRequest,
52
    SSEKMSKeyId,
53
    TaggingHeader,
54
    TagSet,
55
    UploadPartCopyRequest,
56
    UploadPartRequest,
57
)
58
from localstack.aws.api.s3 import Type as GranteeType
1✔
59
from localstack.aws.chain import HandlerChain
1✔
60
from localstack.aws.connect import connect_to
1✔
61
from localstack.constants import AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1
1✔
62
from localstack.http import Response
1✔
63
from localstack.services.s3 import checksums
1✔
64
from localstack.services.s3.constants import (
1✔
65
    ALL_USERS_ACL_GRANTEE,
66
    AUTHENTICATED_USERS_ACL_GRANTEE,
67
    BUCKET_LOCATION_CONSTRAINTS,
68
    CHECKSUM_ALGORITHMS,
69
    EU_WEST_1_LOCATION_CONSTRAINTS,
70
    LOG_DELIVERY_ACL_GRANTEE,
71
    SIGNATURE_V2_PARAMS,
72
    SIGNATURE_V4_PARAMS,
73
    SYSTEM_METADATA_SETTABLE_HEADERS,
74
)
75
from localstack.services.s3.exceptions import (
1✔
76
    IllegalLocationConstraintException,
77
    InvalidRequest,
78
    MalformedXML,
79
)
80
from localstack.utils.aws import arns
1✔
81
from localstack.utils.aws.arns import parse_arn
1✔
82
from localstack.utils.objects import singleton_factory
1✔
83
from localstack.utils.strings import (
1✔
84
    is_base64,
85
    to_bytes,
86
    to_str,
87
)
88
from localstack.utils.urls import localstack_host
1✔
89

90
LOG = logging.getLogger(__name__)
1✔
91

92
BUCKET_NAME_REGEX = (
1✔
93
    r"(?=^.{3,63}$)(?!^(\d+\.)+\d+$)"
94
    + r"(^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])$)"
95
)
96

97
TAG_REGEX = re.compile(r"^[\w\s.:/=+\-@]*$")
1✔
98

99

100
S3_VIRTUAL_HOSTNAME_REGEX = (
1✔
101
    r"(?P<bucket>.*).s3.(?P<region>(?:us-gov|us|ap|ca|cn|eu|sa)-[a-z]+-\d)?.*"
102
)
103

104
_s3_virtual_host_regex = re.compile(S3_VIRTUAL_HOSTNAME_REGEX)
1✔
105

106

107
RFC1123 = "%a, %d %b %Y %H:%M:%S GMT"
1✔
108
_gmt_zone_info = ZoneInfo("GMT")
1✔
109

110

111
def s3_response_handler(chain: HandlerChain, context: RequestContext, response: Response):
1✔
112
    """
113
    This response handler is taking care of removing certain headers from S3 responses.
114
    We cannot handle this in the serializer, because the serializer handler calls `Response.update_from`, which does
115
    not allow you to remove headers, only add them.
116
    This handler can delete headers from the response.
117
    """
118
    # some requests, for example coming frome extensions, are flagged as S3 requests. This check confirms that it is
119
    # indeed truly an S3 request by checking if it parsed properly as an S3 operation
120
    if not context.service_operation:
1✔
121
        return
1✔
122

123
    # if AWS returns 204, it will not return a body, Content-Length and Content-Type
124
    # the web server is already taking care of deleting the body, but it's more explicit to remove it here
125
    if response.status_code == 204:
1✔
126
        response.data = b""
1✔
127
        response.headers.pop("Content-Type", None)
1✔
128
        response.headers.pop("Content-Length", None)
1✔
129

130
    elif (
1✔
131
        response.status_code == 200
132
        and context.request.method == "PUT"
133
        and response.headers.get("Content-Length") in (0, None)
134
    ):
135
        # AWS does not return a Content-Type if the Content-Length is 0
136
        response.headers.pop("Content-Type", None)
1✔
137

138

139
def get_owner_for_account_id(account_id: str):
1✔
140
    """
141
    This method returns the S3 Owner from the account id. for now, this is hardcoded as it was in moto, but we can then
142
    extend it to return different values depending on the account ID
143
    See https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
144
    :param account_id: the owner account id
145
    :return: the Owner object containing the DisplayName and owner ID
146
    """
147
    return Owner(
1✔
148
        DisplayName="webfile",  # only in certain regions, see above
149
        ID="75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a",
150
    )
151

152

153
def extract_bucket_key_version_id_from_copy_source(
1✔
154
    copy_source: CopySource,
155
) -> tuple[BucketName, ObjectKey, ObjectVersionId | None]:
156
    """
157
    Utility to parse bucket name, object key and optionally its versionId. It accepts the CopySource format:
158
    - <bucket-name/<object-key>?versionId=<version-id>, used for example in CopySource for CopyObject
159
    :param copy_source: the S3 CopySource to parse
160
    :return: parsed BucketName, ObjectKey and optionally VersionId
161
    """
162
    copy_source_parsed = urlparser.urlparse(copy_source)
1✔
163
    # we need to manually replace `+` character with a space character before URL decoding, because different languages
164
    # don't encode their URL the same way (%20 vs +), and Python doesn't unquote + into a space char
165
    parsed_path = urlparser.unquote(copy_source_parsed.path.replace("+", " ")).lstrip("/")
1✔
166

167
    if "/" not in parsed_path:
1✔
168
        raise InvalidArgument(
1✔
169
            "Invalid copy source object key",
170
            ArgumentName="x-amz-copy-source",
171
            ArgumentValue="x-amz-copy-source",
172
        )
173
    src_bucket, src_key = parsed_path.split("/", 1)
1✔
174
    src_version_id = urlparser.parse_qs(copy_source_parsed.query).get("versionId", [None])[0]
1✔
175

176
    return src_bucket, src_key, src_version_id
1✔
177

178

179
class ChecksumHash(Protocol):
1✔
180
    """
181
    This Protocol allows proper typing for different kind of hash used by S3 (hashlib.shaX, zlib.crc32 from
182
    S3CRC32Checksum, and botocore CrtCrc32cChecksum).
183
    """
184

185
    def digest(self) -> bytes: ...
1✔
186

187
    def update(self, value: bytes): ...
1✔
188

189

190
def get_s3_checksum_algorithm_from_request(
1✔
191
    request: PutObjectRequest | UploadPartRequest,
192
) -> ChecksumAlgorithm | None:
193
    checksum_algorithm: list[ChecksumAlgorithm] = [
1✔
194
        algo for algo in CHECKSUM_ALGORITHMS if request.get(f"Checksum{algo}")
195
    ]
196
    if not checksum_algorithm:
1✔
197
        return None
1✔
198

199
    if len(checksum_algorithm) > 1:
1✔
200
        raise InvalidRequest(
1✔
201
            "Expecting a single x-amz-checksum- header. Multiple checksum Types are not allowed."
202
        )
203

204
    return checksum_algorithm[0]
1✔
205

206

207
def get_s3_checksum_algorithm_from_trailing_headers(
1✔
208
    trailing_headers: str,
209
) -> ChecksumAlgorithm | None:
210
    checksum_algorithm: list[ChecksumAlgorithm] = [
1✔
211
        algo for algo in CHECKSUM_ALGORITHMS if f"x-amz-checksum-{algo.lower()}" in trailing_headers
212
    ]
213
    if not checksum_algorithm:
1✔
214
        return None
1✔
215

216
    if len(checksum_algorithm) > 1:
1✔
UNCOV
217
        raise InvalidRequest(
×
218
            "Expecting a single x-amz-checksum- header. Multiple checksum Types are not allowed."
219
        )
220

221
    return checksum_algorithm[0]
1✔
222

223

224
def get_s3_checksum(algorithm) -> ChecksumHash:
1✔
225
    match algorithm:
1✔
226
        case ChecksumAlgorithm.CRC32:
1✔
227
            return S3CRC32Checksum()
1✔
228

229
        case ChecksumAlgorithm.CRC32C:
1✔
230
            from botocore.httpchecksum import CrtCrc32cChecksum
1✔
231

232
            return CrtCrc32cChecksum()
1✔
233

234
        case ChecksumAlgorithm.CRC64NVME:
1✔
235
            from botocore.httpchecksum import CrtCrc64NvmeChecksum
1✔
236

237
            return CrtCrc64NvmeChecksum()
1✔
238

239
        case ChecksumAlgorithm.SHA1:
1✔
240
            return hashlib.sha1(usedforsecurity=False)
1✔
241

242
        case ChecksumAlgorithm.SHA256:
1✔
243
            return hashlib.sha256(usedforsecurity=False)
1✔
244

UNCOV
245
        case _:
×
246
            # TODO: check proper error? for now validated client side, need to check server response
UNCOV
247
            raise InvalidRequest("The value specified in the x-amz-trailer header is not supported")
×
248

249

250
class S3CRC32Checksum:
1✔
251
    """Implements a unified way of using zlib.crc32 compatible with hashlib.sha and botocore CrtCrc32cChecksum"""
252

253
    __slots__ = ["checksum"]
1✔
254

255
    def __init__(self):
1✔
256
        self.checksum = zlib.crc32(b"")
1✔
257

258
    def update(self, value: bytes):
1✔
259
        self.checksum = zlib.crc32(value, self.checksum)
1✔
260

261
    def digest(self) -> bytes:
1✔
262
        return self.checksum.to_bytes(4, "big")
1✔
263

264

265
class CombinedCrcHash:
1✔
266
    def __init__(self, checksum_type: ChecksumAlgorithm):
1✔
267
        match checksum_type:
1✔
268
            case ChecksumAlgorithm.CRC32:
1✔
269
                func = checksums.combine_crc32
1✔
270
            case ChecksumAlgorithm.CRC32C:
1✔
271
                func = checksums.combine_crc32c
1✔
272
            case ChecksumAlgorithm.CRC64NVME:
1✔
273
                func = checksums.combine_crc64_nvme
1✔
UNCOV
274
            case _:
×
UNCOV
275
                raise ValueError("You cannot combine SHA based checksums")
×
276

277
        self.combine_function = func
1✔
278
        self.checksum = b""
1✔
279

280
    def combine(self, value: bytes, object_len: int):
1✔
281
        if not self.checksum:
1✔
282
            self.checksum = value
1✔
283
            return
1✔
284

285
        self.checksum = self.combine_function(self.checksum, value, object_len)
1✔
286

287
    def digest(self):
1✔
288
        return self.checksum
1✔
289

290

291
class ObjectRange(NamedTuple):
1✔
292
    """
293
    NamedTuple representing a parsed Range header with the requested S3 object size
294
    https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range
295
    """
296

297
    content_range: str  # the original Range header
1✔
298
    content_length: int  # the full requested object size
1✔
299
    begin: int  # the start of range
1✔
300
    end: int  # the end of the end
1✔
301

302

303
def parse_range_header(range_header: str, object_size: int) -> ObjectRange | None:
1✔
304
    """
305
    Takes a Range header, and returns a dataclass containing the necessary information to return only a slice of an
306
    S3 object. If the range header is invalid, we return None so that the request is treated as a regular request.
307
    :param range_header: a Range header
308
    :param object_size: the requested S3 object total size
309
    :return: ObjectRange or None if the Range header is invalid
310
    """
311
    last = object_size - 1
1✔
312
    try:
1✔
313
        _, rspec = range_header.split("=")
1✔
314
    except ValueError:
1✔
315
        return None
1✔
316
    if "," in rspec:
1✔
317
        return None
1✔
318

319
    try:
1✔
320
        begin, end = [int(i) if i else None for i in rspec.split("-")]
1✔
321
    except ValueError:
1✔
322
        # if we can't parse the Range header, S3 just treat the request as a non-range request
323
        return None
1✔
324

325
    if (begin is None and end == 0) or (begin is not None and begin > last):
1✔
326
        raise InvalidRange(
1✔
327
            "The requested range is not satisfiable",
328
            ActualObjectSize=str(object_size),
329
            RangeRequested=range_header,
330
        )
331

332
    if begin is not None:  # byte range
1✔
333
        end = last if end is None else min(end, last)
1✔
334
    elif end is not None:  # suffix byte range
1✔
335
        begin = object_size - min(end, object_size)
1✔
336
        end = last
1✔
337
    else:
338
        # Treat as non-range request
339
        return None
1✔
340

341
    if begin > min(end, last):
1✔
342
        # Treat as non-range request if after the logic is applied
343
        return None
1✔
344

345
    return ObjectRange(
1✔
346
        content_range=f"bytes {begin}-{end}/{object_size}",
347
        content_length=end - begin + 1,
348
        begin=begin,
349
        end=end,
350
    )
351

352

353
def parse_copy_source_range_header(copy_source_range: str, object_size: int) -> ObjectRange:
1✔
354
    """
355
    Takes a CopySourceRange parameter, and returns a dataclass containing the necessary information to return only a slice of an
356
    S3 object. The validation is much stricter than `parse_range_header`
357
    :param copy_source_range: a CopySourceRange parameter for UploadCopyPart
358
    :param object_size: the requested S3 object total size
359
    :raises InvalidArgument if the CopySourceRanger parameter does not follow validation
360
    :return: ObjectRange
361
    """
362
    last = object_size - 1
1✔
363
    try:
1✔
364
        _, rspec = copy_source_range.split("=")
1✔
365
    except ValueError:
1✔
366
        raise InvalidArgument(
1✔
367
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
368
            ArgumentName="x-amz-copy-source-range",
369
            ArgumentValue=copy_source_range,
370
        )
371
    if "," in rspec:
1✔
372
        raise InvalidArgument(
1✔
373
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
374
            ArgumentName="x-amz-copy-source-range",
375
            ArgumentValue=copy_source_range,
376
        )
377

378
    try:
1✔
379
        begin, end = [int(i) if i else None for i in rspec.split("-")]
1✔
380
    except ValueError:
1✔
381
        # if we can't parse the Range header, S3 just treat the request as a non-range request
382
        raise InvalidArgument(
1✔
383
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
384
            ArgumentName="x-amz-copy-source-range",
385
            ArgumentValue=copy_source_range,
386
        )
387

388
    if begin is None or end is None or begin > end:
1✔
389
        raise InvalidArgument(
1✔
390
            "The x-amz-copy-source-range value must be of the form bytes=first-last where first and last are the zero-based offsets of the first and last bytes to copy",
391
            ArgumentName="x-amz-copy-source-range",
392
            ArgumentValue=copy_source_range,
393
        )
394

395
    if begin > last:
1✔
396
        # Treat as non-range request if after the logic is applied
397
        raise InvalidRequest(
1✔
398
            "The specified copy range is invalid for the source object size",
399
        )
400
    elif end > last:
1✔
401
        raise InvalidArgument(
1✔
402
            f"Range specified is not valid for source object of size: {object_size}",
403
            ArgumentName="x-amz-copy-source-range",
404
            ArgumentValue=copy_source_range,
405
        )
406

407
    return ObjectRange(
1✔
408
        content_range=f"bytes {begin}-{end}/{object_size}",
409
        content_length=end - begin + 1,
410
        begin=begin,
411
        end=end,
412
    )
413

414

415
def get_failed_upload_part_copy_source_preconditions(
1✔
416
    request: UploadPartCopyRequest, last_modified: datetime.datetime, etag: ETag
417
) -> str | None:
418
    """
419
    Utility which parses the conditions from a S3 UploadPartCopy request.
420
    Note: The order in which these conditions are checked if used in conjunction matters
421

422
    :param UploadPartCopyRequest request: The S3 UploadPartCopy request.
423
    :param datetime last_modified: The time the source object was last modified.
424
    :param ETag etag: The ETag of the source object.
425

426
    :returns: The name of the failed precondition.
427
    """
428
    if_match = request.get("CopySourceIfMatch")
1✔
429
    if_none_match = request.get("CopySourceIfNoneMatch")
1✔
430
    if_unmodified_since = request.get("CopySourceIfUnmodifiedSince")
1✔
431
    if_modified_since = request.get("CopySourceIfModifiedSince")
1✔
432
    last_modified = second_resolution_datetime(last_modified)
1✔
433

434
    if if_match:
1✔
435
        if if_match.strip('"') != etag.strip('"'):
1✔
436
            return "x-amz-copy-source-If-Match"
1✔
437
        if if_modified_since and if_modified_since > last_modified:
1✔
UNCOV
438
            return "x-amz-copy-source-If-Modified-Since"
×
439
        # CopySourceIfMatch is unaffected by CopySourceIfUnmodifiedSince so return early
440
        if if_unmodified_since:
1✔
441
            return None
1✔
442

443
    if if_unmodified_since and second_resolution_datetime(if_unmodified_since) < last_modified:
1✔
444
        return "x-amz-copy-source-If-Unmodified-Since"
1✔
445

446
    if if_none_match and if_none_match.strip('"') == etag.strip('"'):
1✔
447
        return "x-amz-copy-source-If-None-Match"
1✔
448

449
    if if_modified_since and last_modified <= second_resolution_datetime(
1✔
450
        if_modified_since
451
    ) < datetime.datetime.now(tz=_gmt_zone_info):
452
        return "x-amz-copy-source-If-Modified-Since"
1✔
453

454

455
def get_full_default_bucket_location(bucket_name: BucketName) -> str:
1✔
456
    host_definition = localstack_host()
1✔
457
    if host_definition.host != constants.LOCALHOST_HOSTNAME:
1✔
458
        # the user has customised their LocalStack hostname, and may not support subdomains.
459
        # Return the location in path form.
460
        return f"{config.get_protocol()}://{host_definition.host_and_port()}/{bucket_name}/"
1✔
461
    else:
462
        return f"{config.get_protocol()}://{bucket_name}.s3.{host_definition.host_and_port()}/"
1✔
463

464

465
def etag_to_base_64_content_md5(etag: ETag) -> str:
1✔
466
    """
467
    Convert an ETag, representing a MD5 hexdigest (might be quoted), to its base64 encoded representation
468
    :param etag: an ETag, might be quoted
469
    :return: the base64 value
470
    """
471
    # get the bytes digest from the hexdigest
472
    byte_digest = codecs.decode(to_bytes(etag.strip('"')), "hex")
1✔
473
    return to_str(base64.b64encode(byte_digest))
1✔
474

475

476
def base_64_content_md5_to_etag(content_md5: ContentMD5) -> str | None:
1✔
477
    """
478
    Convert a ContentMD5 header, representing a base64 encoded representation of a MD5 binary digest to its ETag value,
479
    hex encoded
480
    :param content_md5: a ContentMD5 header, base64 encoded
481
    :return: the ETag value, hex coded MD5 digest, or None if the input is not valid b64 or the representation of a MD5
482
    hash
483
    """
484
    if not is_base64(content_md5):
1✔
485
        return None
1✔
486
    # get the hexdigest from the bytes digest
487
    byte_digest = base64.b64decode(content_md5)
1✔
488
    hex_digest = to_str(codecs.encode(byte_digest, "hex"))
1✔
489
    if len(hex_digest) != 32:
1✔
490
        return None
1✔
491

492
    return hex_digest
1✔
493

494

495
def is_presigned_url_request(context: RequestContext) -> bool:
1✔
496
    """
497
    Detects pre-signed URL from query string parameters
498
    Return True if any kind of presigned URL query string parameter is encountered
499
    :param context: the request context from the handler chain
500
    """
501
    # Detecting pre-sign url and checking signature
502
    query_parameters = context.request.args
1✔
503
    return any(p in query_parameters for p in SIGNATURE_V2_PARAMS) or any(
1✔
504
        p in query_parameters for p in SIGNATURE_V4_PARAMS
505
    )
506

507

508
def is_bucket_name_valid(bucket_name: str) -> bool:
1✔
509
    """
510
    ref. https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
511
    """
512
    return True if re.match(BUCKET_NAME_REGEX, bucket_name) else False
1✔
513

514

515
def get_permission_header_name(permission: Permission) -> str:
1✔
516
    return f"x-amz-grant-{permission.replace('_', '-').lower()}"
1✔
517

518

519
def get_permission_from_header(capitalized_field: str) -> Permission:
1✔
520
    headers_parts = [part.upper() for part in re.split(r"([A-Z][a-z]+)", capitalized_field) if part]
1✔
521
    return "_".join(headers_parts[1:])
1✔
522

523

524
def is_valid_canonical_id(canonical_id: str) -> bool:
1✔
525
    """
526
    Validate that the string is a hex string with 64 char
527
    """
528
    try:
1✔
529
        return int(canonical_id, 16) and len(canonical_id) == 64
1✔
530
    except ValueError:
1✔
531
        return False
1✔
532

533

534
def uses_host_addressing(headers: Mapping[str, str]) -> str | None:
1✔
535
    """
536
    Determines if the request is targeting S3 with virtual host addressing
537
    :param headers: the request headers
538
    :return: if the request targets S3 with virtual host addressing, returns the bucket name else None
539
    """
540
    host = headers.get("host", "")
1✔
541

542
    # try to extract the bucket from the hostname (the "in" check is a minor optimization, as the regex is very greedy)
543
    if ".s3." in host and (
1✔
544
        (match := _s3_virtual_host_regex.match(host)) and (bucket_name := match.group("bucket"))
545
    ):
546
        return bucket_name
1✔
547

548

549
def get_class_attrs_from_spec_class(spec_class: type[StrEnum]) -> set[str]:
1✔
550
    return {str(spec) for spec in spec_class}
1✔
551

552

553
def get_system_metadata_from_request(request: dict) -> Metadata:
1✔
554
    metadata: Metadata = {}
1✔
555

556
    for system_metadata_field in SYSTEM_METADATA_SETTABLE_HEADERS:
1✔
557
        if field_value := request.get(system_metadata_field):
1✔
558
            metadata[system_metadata_field] = field_value
1✔
559

560
    return metadata
1✔
561

562

563
def extract_bucket_name_and_key_from_headers_and_path(
1✔
564
    headers: dict[str, str], path: str
565
) -> tuple[str | None, str | None]:
566
    """
567
    Extract the bucket name and the object key from a request headers and path. This works with both virtual host
568
    and path style requests.
569
    :param headers: the request headers, used to get the Host
570
    :param path: the request path
571
    :return: if found, the bucket name and object key
572
    """
573
    bucket_name = None
1✔
574
    object_key = None
1✔
575
    host = headers.get("host", "")
1✔
576
    if ".s3" in host:
1✔
577
        vhost_match = _s3_virtual_host_regex.match(host)
1✔
578
        if vhost_match and vhost_match.group("bucket"):
1✔
579
            bucket_name = vhost_match.group("bucket") or None
1✔
580
            split = path.split("/", maxsplit=1)
1✔
581
            if len(split) > 1 and split[1]:
1✔
582
                object_key = split[1]
1✔
583
    else:
584
        path_without_params = path.partition("?")[0]
1✔
585
        split = path_without_params.split("/", maxsplit=2)
1✔
586
        bucket_name = split[1] or None
1✔
587
        if len(split) > 2:
1✔
588
            object_key = split[2]
1✔
589

590
    return bucket_name, object_key
1✔
591

592

593
def normalize_bucket_name(bucket_name):
1✔
594
    bucket_name = bucket_name or ""
1✔
595
    bucket_name = bucket_name.lower()
1✔
596
    return bucket_name
1✔
597

598

599
def get_bucket_and_key_from_s3_uri(s3_uri: str) -> tuple[str, str]:
1✔
600
    """
601
    Extracts the bucket name and key from s3 uri
602
    """
603
    output_bucket, _, output_key = s3_uri.removeprefix("s3://").partition("/")
1✔
604
    return output_bucket, output_key
1✔
605

606

607
def get_bucket_and_key_from_presign_url(presign_url: str) -> tuple[str, str]:
1✔
608
    """
609
    Extracts the bucket name and key from s3 presign url
610
    """
611
    parsed_url = urlparser.urlparse(presign_url)
1✔
612
    bucket = parsed_url.path.split("/")[1]
1✔
613
    key = "/".join(parsed_url.path.split("/")[2:]).split("?")[0]
1✔
614
    return bucket, key
1✔
615

616

617
def capitalize_header_name_from_snake_case(header_name: str) -> str:
1✔
618
    return "-".join([part.capitalize() for part in header_name.split("-")])
1✔
619

620

621
def get_kms_key_arn(kms_key: str, account_id: str, bucket_region: str) -> str | None:
1✔
622
    """
623
    In S3, the KMS key can be passed as a KeyId or a KeyArn. This method allows to always get the KeyArn from either.
624
    It can also validate if the key is in the same region, and raise an exception.
625
    :param kms_key: the KMS key id or ARN
626
    :param account_id: the bucket account id
627
    :param bucket_region: the bucket region
628
    :raise KMS.NotFoundException if the key is not in the same region
629
    :return: the key ARN if found and enabled
630
    """
631
    if not kms_key:
1✔
632
        return None
1✔
633
    try:
1✔
634
        parsed_arn = parse_arn(kms_key)
1✔
635
        key_region = parsed_arn["region"]
1✔
636
        # the KMS key should be in the same region as the bucket, we can raise an exception without calling KMS
637
        if bucket_region and key_region != bucket_region:
1✔
638
            raise CommonServiceException(
1✔
639
                code="KMS.NotFoundException", message=f"Invalid arn {key_region}"
640
            )
641

642
    except InvalidArnException:
1✔
643
        # if it fails, the passed ID is a UUID with no region data
644
        key_id = kms_key
1✔
645
        # recreate the ARN manually with the bucket region and bucket owner
646
        # if the KMS key is cross-account, user should provide an ARN and not a KeyId
647
        kms_key = arns.kms_key_arn(key_id=key_id, account_id=account_id, region_name=bucket_region)
1✔
648

649
    return kms_key
1✔
650

651

652
# TODO: replace Any by a replacement for S3Bucket, some kind of defined type?
653
def validate_kms_key_id(kms_key: str, bucket: Any) -> None:
1✔
654
    """
655
    Validate that the KMS key used to encrypt the object is valid
656
    :param kms_key: the KMS key id or ARN
657
    :param bucket: the targeted bucket
658
    :raise KMS.DisabledException if the key is disabled
659
    :raise KMS.NotFoundException if the key is not in the same region or does not exist
660
    :return: the key ARN if found and enabled
661
    """
662
    if hasattr(bucket, "region_name"):
1✔
UNCOV
663
        bucket_region = bucket.region_name
×
664
    else:
665
        bucket_region = bucket.bucket_region
1✔
666

667
    if hasattr(bucket, "account_id"):
1✔
UNCOV
668
        bucket_account_id = bucket.account_id
×
669
    else:
670
        bucket_account_id = bucket.bucket_account_id
1✔
671

672
    kms_key_arn = get_kms_key_arn(kms_key, bucket_account_id, bucket_region)
1✔
673

674
    # the KMS key should be in the same region as the bucket, create the client in the bucket region
675
    kms_client = connect_to(region_name=bucket_region).kms
1✔
676
    try:
1✔
677
        key = kms_client.describe_key(KeyId=kms_key_arn)
1✔
678
        if not key["KeyMetadata"]["Enabled"]:
1✔
679
            if key["KeyMetadata"]["KeyState"] == "PendingDeletion":
1✔
680
                raise CommonServiceException(
1✔
681
                    code="KMS.KMSInvalidStateException",
682
                    message=f"{key['KeyMetadata']['Arn']} is pending deletion.",
683
                )
684
            raise CommonServiceException(
1✔
685
                code="KMS.DisabledException", message=f"{key['KeyMetadata']['Arn']} is disabled."
686
            )
687

688
    except ClientError as e:
1✔
689
        if e.response["Error"]["Code"] == "NotFoundException":
1✔
690
            raise CommonServiceException(
1✔
691
                code="KMS.NotFoundException", message=e.response["Error"]["Message"]
692
            )
UNCOV
693
        raise
×
694

695

696
def create_s3_kms_managed_key_for_region(account_id: str, region_name: str) -> SSEKMSKeyId:
1✔
697
    kms_client = connect_to(aws_access_key_id=account_id, region_name=region_name).kms
1✔
698
    key = kms_client.create_key(
1✔
699
        Description="Default key that protects my S3 objects when no other key is defined"
700
    )
701

702
    return key["KeyMetadata"]["Arn"]
1✔
703

704

705
def rfc_1123_datetime(src: datetime.datetime) -> str:
1✔
706
    return src.strftime(RFC1123)
1✔
707

708

709
def str_to_rfc_1123_datetime(value: str) -> datetime.datetime:
1✔
710
    return datetime.datetime.strptime(value, RFC1123).replace(tzinfo=_gmt_zone_info)
1✔
711

712

713
def second_resolution_datetime(src: datetime.datetime) -> datetime.datetime:
1✔
714
    return src.replace(microsecond=0)
1✔
715

716

717
def add_expiration_days_to_datetime(user_datatime: datetime.datetime, exp_days: int) -> str:
1✔
718
    """
719
    This adds expiration days to a datetime, rounding to the next day at midnight UTC.
720
    :param user_datatime: datetime object
721
    :param exp_days: provided days
722
    :return: return a datetime object, rounded to midnight, in string formatted to rfc_1123
723
    """
724
    rounded_datetime = user_datatime.replace(
1✔
725
        hour=0, minute=0, second=0, microsecond=0
726
    ) + datetime.timedelta(days=exp_days + 1)
727

728
    return rfc_1123_datetime(rounded_datetime)
1✔
729

730

731
def serialize_expiration_header(
1✔
732
    rule_id: str, lifecycle_exp: LifecycleExpiration, last_modified: datetime.datetime
733
):
734
    if exp_days := lifecycle_exp.get("Days"):
1✔
735
        # AWS round to the next day at midnight UTC
736
        exp_date = add_expiration_days_to_datetime(last_modified, exp_days)
1✔
737
    else:
738
        exp_date = rfc_1123_datetime(lifecycle_exp["Date"])
1✔
739

740
    return f'expiry-date="{exp_date}", rule-id="{rule_id}"'
1✔
741

742

743
def get_lifecycle_rule_from_object(
1✔
744
    lifecycle_conf_rules: LifecycleRules,
745
    object_key: ObjectKey,
746
    size: ObjectSize,
747
    object_tags: dict[str, str],
748
) -> LifecycleRule:
749
    for rule in lifecycle_conf_rules:
1✔
750
        if not (expiration := rule.get("Expiration")) or "ExpiredObjectDeleteMarker" in expiration:
1✔
751
            continue
1✔
752

753
        if not (rule_filter := rule.get("Filter")):
1✔
754
            return rule
1✔
755

756
        if and_rules := rule_filter.get("And"):
1✔
757
            if all(
1✔
758
                _match_lifecycle_filter(key, value, object_key, size, object_tags)
759
                for key, value in and_rules.items()
760
            ):
UNCOV
761
                return rule
×
762

763
        if any(
1✔
764
            _match_lifecycle_filter(key, value, object_key, size, object_tags)
765
            for key, value in rule_filter.items()
766
        ):
767
            # after validation, we can only one of `Prefix`, `Tag`, `ObjectSizeGreaterThan` or `ObjectSizeLessThan` in
768
            # the dict. Instead of manually checking, we can iterate of the only key and try to match it
769
            return rule
1✔
770

771

772
def _match_lifecycle_filter(
1✔
773
    filter_key: str,
774
    filter_value: str | int | dict[str, str],
775
    object_key: ObjectKey,
776
    size: ObjectSize,
777
    object_tags: dict[str, str],
778
):
779
    match filter_key:
1✔
780
        case "Prefix":
1✔
781
            return object_key.startswith(filter_value)
1✔
782
        case "Tag":
1✔
783
            return object_tags and object_tags.get(filter_value.get("Key")) == filter_value.get(
1✔
784
                "Value"
785
            )
786
        case "ObjectSizeGreaterThan":
1✔
787
            return size > filter_value
1✔
788
        case "ObjectSizeLessThan":
1✔
789
            return size < filter_value
1✔
790
        case "Tags":  # this is inside the `And` field
1✔
791
            return object_tags and all(
1✔
792
                object_tags.get(tag.get("Key")) == tag.get("Value") for tag in filter_value
793
            )
794

795

796
def parse_expiration_header(
1✔
797
    expiration_header: str,
798
) -> tuple[datetime.datetime | None, str | None]:
799
    try:
1✔
800
        header_values = dict(
1✔
801
            (p.strip('"') for p in v.split("=")) for v in expiration_header.split('", ')
802
        )
803
        expiration_date = str_to_rfc_1123_datetime(header_values["expiry-date"])
1✔
804
        return expiration_date, header_values["rule-id"]
1✔
805

806
    except (IndexError, ValueError, KeyError):
1✔
807
        return None, None
1✔
808

809

810
def validate_dict_fields(data: dict, required_fields: set, optional_fields: set = None):
1✔
811
    """
812
    Validate whether the `data` dict contains at least the required fields and not more than the union of the required
813
    and optional fields
814
    TODO: we could pass the TypedDict to also use its required/optional properties, but it could be sensitive to
815
     mistake/changes in the specs and not always right
816
    :param data: the dict we want to validate
817
    :param required_fields: a set containing the required fields
818
    :param optional_fields: a set containing the optional fields
819
    :return: bool, whether the dict is valid or not
820
    """
821
    if optional_fields is None:
1✔
822
        optional_fields = set()
1✔
823
    return (set_fields := set(data)) >= required_fields and set_fields <= (
1✔
824
        required_fields | optional_fields
825
    )
826

827

828
def parse_tagging_header(tagging_header: TaggingHeader) -> dict:
1✔
829
    try:
1✔
830
        parsed_tags = urlparser.parse_qs(tagging_header, keep_blank_values=True)
1✔
831
        tags: dict[str, str] = {}
1✔
832
        for key, val in parsed_tags.items():
1✔
833
            if len(val) != 1 or not TAG_REGEX.match(key) or not TAG_REGEX.match(val[0]):
1✔
834
                raise InvalidArgument(
1✔
835
                    "The header 'x-amz-tagging' shall be encoded as UTF-8 then URLEncoded URL query parameters without tag name duplicates.",
836
                    ArgumentName="x-amz-tagging",
837
                    ArgumentValue=tagging_header,
838
                )
839
            elif key.startswith("aws:"):
1✔
UNCOV
840
                raise
×
841
            tags[key] = val[0]
1✔
842
        return tags
1✔
843

844
    except ValueError:
1✔
UNCOV
845
        raise InvalidArgument(
×
846
            "The header 'x-amz-tagging' shall be encoded as UTF-8 then URLEncoded URL query parameters without tag name duplicates.",
847
            ArgumentName="x-amz-tagging",
848
            ArgumentValue=tagging_header,
849
        )
850

851

852
def validate_tag_set(
1✔
853
    tag_set: TagSet, type_set: Literal["bucket", "object", "create-bucket"] = "bucket"
854
):
855
    keys = set()
1✔
856
    for tag in tag_set:
1✔
857
        if set(tag) != {"Key", "Value"}:
1✔
UNCOV
858
            raise MalformedXML()
×
859

860
        key = tag["Key"]
1✔
861
        value = tag["Value"]
1✔
862

863
        if key is None or value is None:
1✔
864
            raise MalformedXML()
1✔
865

866
        if key in keys:
1✔
867
            raise InvalidTag(
1✔
868
                "Cannot provide multiple Tags with the same key",
869
                TagKey=key,
870
            )
871

872
        if key.startswith("aws:"):
1✔
873
            if type_set == "bucket":
1✔
874
                message = "System tags cannot be added/updated by requester"
1✔
875
            elif type_set == "object":
1✔
876
                message = "Your TagKey cannot be prefixed with aws:"
1✔
877
            else:
878
                message = 'User-defined tag keys can\'t start with "aws:". This prefix is reserved for system tags. Remove "aws:" from your tag keys and try again.'
1✔
879
            raise InvalidTag(
1✔
880
                message,
881
                # weirdly, AWS does not return the `TagKey` field here, but it does if the TagKey does not match the
882
                # regex in the next step
883
                TagKey=key if type_set != "create-bucket" else None,
884
            )
885

886
        if not TAG_REGEX.match(key):
1✔
887
            raise InvalidTag(
1✔
888
                "The TagKey you have provided is invalid",
889
                TagKey=key,
890
            )
891
        elif not TAG_REGEX.match(value):
1✔
892
            raise InvalidTag(
1✔
893
                "The TagValue you have provided is invalid", TagKey=key, TagValue=value
894
            )
895

896
        keys.add(key)
1✔
897

898

899
def validate_location_constraint(context_region: str, location_constraint: str) -> None:
1✔
900
    if location_constraint:
1✔
901
        if context_region == AWS_REGION_US_EAST_1:
1✔
902
            if (
1✔
903
                not config.ALLOW_NONSTANDARD_REGIONS
904
                and location_constraint not in BUCKET_LOCATION_CONSTRAINTS
905
            ):
906
                raise InvalidLocationConstraint(
1✔
907
                    "The specified location-constraint is not valid",
908
                    LocationConstraint=location_constraint,
909
                )
910
        elif context_region == AWS_REGION_EU_WEST_1:
1✔
911
            if location_constraint not in EU_WEST_1_LOCATION_CONSTRAINTS:
1✔
912
                raise IllegalLocationConstraintException(location_constraint)
1✔
913
        elif context_region != location_constraint:
1✔
914
            raise IllegalLocationConstraintException(location_constraint)
1✔
915
    else:
916
        if context_region != AWS_REGION_US_EAST_1:
1✔
917
            raise IllegalLocationConstraintException("unspecified")
1✔
918

919

920
def get_unique_key_id(
1✔
921
    bucket: BucketName, object_key: ObjectKey, version_id: ObjectVersionId
922
) -> str:
923
    return f"{bucket}/{object_key}/{version_id or 'null'}"
1✔
924

925

926
def get_retention_from_now(days: int = None, years: int = None) -> datetime.datetime:
1✔
927
    """
928
    This calculates a retention date from now, adding days or years to it
929
    :param days: provided days
930
    :param years: provided years, exclusive with days
931
    :return: return a datetime object
932
    """
933
    if not days and not years:
1✔
UNCOV
934
        raise ValueError("Either 'days' or 'years' needs to be provided")
×
935
    now = datetime.datetime.now(tz=_gmt_zone_info)
1✔
936
    if days:
1✔
937
        retention = now + datetime.timedelta(days=days)
1✔
938
    else:
UNCOV
939
        retention = now.replace(year=now.year + years)
×
940

941
    return retention
1✔
942

943

944
def get_failed_precondition_copy_source(
1✔
945
    request: CopyObjectRequest, last_modified: datetime.datetime, etag: ETag
946
) -> str | None:
947
    """
948
    Validate if the source object LastModified and ETag matches a precondition, and if it does, return the failed
949
    precondition
950
    # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
951
    :param request: the CopyObjectRequest
952
    :param last_modified: source object LastModified
953
    :param etag: source object ETag
954
    :return str: the failed precondition to raise
955
    """
956
    last_modified = second_resolution_datetime(last_modified)
1✔
957
    if (cs_if_match := request.get("CopySourceIfMatch")) and etag.strip('"') != cs_if_match.strip(
1✔
958
        '"'
959
    ):
960
        return "x-amz-copy-source-If-Match"
1✔
961

962
    elif (
1✔
963
        cs_if_unmodified_since := request.get("CopySourceIfUnmodifiedSince")
964
    ) and last_modified > second_resolution_datetime(cs_if_unmodified_since):
965
        return "x-amz-copy-source-If-Unmodified-Since"
1✔
966

967
    elif (cs_if_none_match := request.get("CopySourceIfNoneMatch")) and etag.strip(
1✔
968
        '"'
969
    ) == cs_if_none_match.strip('"'):
970
        return "x-amz-copy-source-If-None-Match"
1✔
971

972
    elif (
1✔
973
        cs_if_modified_since := request.get("CopySourceIfModifiedSince")
974
    ) and last_modified <= second_resolution_datetime(cs_if_modified_since) < datetime.datetime.now(
975
        tz=_gmt_zone_info
976
    ):
977
        return "x-amz-copy-source-If-Modified-Since"
1✔
978

979

980
def validate_failed_precondition(
1✔
981
    request: GetObjectRequest | HeadObjectRequest, last_modified: datetime.datetime, etag: ETag
982
) -> None:
983
    """
984
    Validate if the object LastModified and ETag matches a precondition, and if it does, return the failed
985
    precondition
986
    :param request: the GetObjectRequest or HeadObjectRequest
987
    :param last_modified: S3 object LastModified
988
    :param etag: S3 object ETag
989
    :raises PreconditionFailed
990
    :raises NotModified, 304 with an empty body
991
    """
992
    precondition_failed = None
1✔
993
    # last_modified needs to be rounded to a second so that strict equality can be enforced from a RFC1123 header
994
    last_modified = second_resolution_datetime(last_modified)
1✔
995
    if (if_match := request.get("IfMatch")) and etag != if_match.strip('"'):
1✔
996
        precondition_failed = "If-Match"
1✔
997

998
    elif (
1✔
999
        if_unmodified_since := request.get("IfUnmodifiedSince")
1000
    ) and last_modified > second_resolution_datetime(if_unmodified_since):
1001
        precondition_failed = "If-Unmodified-Since"
1✔
1002

1003
    if precondition_failed:
1✔
1004
        raise PreconditionFailed(
1✔
1005
            "At least one of the pre-conditions you specified did not hold",
1006
            Condition=precondition_failed,
1007
        )
1008

1009
    if ((if_none_match := request.get("IfNoneMatch")) and etag == if_none_match.strip('"')) or (
1✔
1010
        (if_modified_since := request.get("IfModifiedSince"))
1011
        and last_modified
1012
        <= second_resolution_datetime(if_modified_since)
1013
        < datetime.datetime.now(tz=_gmt_zone_info)
1014
    ):
1015
        raise CommonServiceException(
1✔
1016
            message="Not Modified",
1017
            code="NotModified",
1018
            status_code=304,
1019
        )
1020

1021

1022
def get_canned_acl(
1✔
1023
    canned_acl: BucketCannedACL | ObjectCannedACL, owner: Owner
1024
) -> AccessControlPolicy:
1025
    """
1026
    Return the proper Owner and Grants from a CannedACL
1027
    See https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
1028
    :param canned_acl: an S3 CannedACL
1029
    :param owner: the current owner of the bucket or object
1030
    :return: an AccessControlPolicy containing the Grants and Owner
1031
    """
1032
    owner_grantee = Grantee(**owner, Type=GranteeType.CanonicalUser)
1✔
1033
    grants = [Grant(Grantee=owner_grantee, Permission=Permission.FULL_CONTROL)]
1✔
1034

1035
    match canned_acl:
1✔
1036
        case ObjectCannedACL.private:
1✔
1037
            pass  # no other permissions
1✔
1038
        case ObjectCannedACL.public_read:
1✔
1039
            grants.append(Grant(Grantee=ALL_USERS_ACL_GRANTEE, Permission=Permission.READ))
1✔
1040

1041
        case ObjectCannedACL.public_read_write:
1✔
1042
            grants.append(Grant(Grantee=ALL_USERS_ACL_GRANTEE, Permission=Permission.READ))
1✔
1043
            grants.append(Grant(Grantee=ALL_USERS_ACL_GRANTEE, Permission=Permission.WRITE))
1✔
UNCOV
1044
        case ObjectCannedACL.authenticated_read:
×
UNCOV
1045
            grants.append(
×
1046
                Grant(Grantee=AUTHENTICATED_USERS_ACL_GRANTEE, Permission=Permission.READ)
1047
            )
UNCOV
1048
        case ObjectCannedACL.bucket_owner_read:
×
UNCOV
1049
            pass  # TODO: bucket owner ACL
×
UNCOV
1050
        case ObjectCannedACL.bucket_owner_full_control:
×
UNCOV
1051
            pass  # TODO: bucket owner ACL
×
UNCOV
1052
        case ObjectCannedACL.aws_exec_read:
×
UNCOV
1053
            pass  # TODO: bucket owner, EC2 Read
×
UNCOV
1054
        case BucketCannedACL.log_delivery_write:
×
UNCOV
1055
            grants.append(Grant(Grantee=LOG_DELIVERY_ACL_GRANTEE, Permission=Permission.READ_ACP))
×
UNCOV
1056
            grants.append(Grant(Grantee=LOG_DELIVERY_ACL_GRANTEE, Permission=Permission.WRITE))
×
1057

1058
    return AccessControlPolicy(Owner=owner, Grants=grants)
1✔
1059

1060

1061
def create_redirect_for_post_request(
1✔
1062
    base_redirect: str, bucket: BucketName, object_key: ObjectKey, etag: ETag
1063
):
1064
    """
1065
    POST requests can redirect if successful. It will take the URL provided and append query string parameters
1066
    (key, bucket and ETag). It needs to be a full URL.
1067
    :param base_redirect: the URL provided for redirection
1068
    :param bucket: bucket name
1069
    :param object_key: object key
1070
    :param etag: key ETag
1071
    :return: the URL provided with the new appended query string parameters
1072
    """
1073
    parts = urlparser.urlparse(base_redirect)
1✔
1074
    if not parts.netloc:
1✔
1075
        raise ValueError("The provided URL is not valid")
1✔
1076
    queryargs = urlparser.parse_qs(parts.query)
1✔
1077
    queryargs["key"] = [object_key]
1✔
1078
    queryargs["bucket"] = [bucket]
1✔
1079
    queryargs["etag"] = [etag]
1✔
1080
    redirect_queryargs = urlparser.urlencode(queryargs, doseq=True)
1✔
1081
    newparts = (
1✔
1082
        parts.scheme,
1083
        parts.netloc,
1084
        parts.path,
1085
        parts.params,
1086
        redirect_queryargs,
1087
        parts.fragment,
1088
    )
1089
    return urlparser.urlunparse(newparts)
1✔
1090

1091

1092
def parse_post_object_tagging_xml(tagging: str) -> dict | None:
1✔
1093
    try:
1✔
1094
        tag_set = {}
1✔
1095
        tags = xmltodict.parse(tagging)
1✔
1096
        xml_tags = tags.get("Tagging", {}).get("TagSet", {}).get("Tag", [])
1✔
1097
        if not xml_tags:
1✔
1098
            # if the Tagging does not respect the schema, just return
1099
            return
1✔
1100
        if not isinstance(xml_tags, list):
1✔
1101
            xml_tags = [xml_tags]
1✔
1102
        for tag in xml_tags:
1✔
1103
            tag_set[tag["Key"]] = tag["Value"]
1✔
1104

1105
        return tag_set
1✔
1106

1107
    except Exception:
1✔
1108
        raise MalformedXML()
1✔
1109

1110

1111
def generate_safe_version_id() -> str:
1✔
1112
    """
1113
    Generate a safe version id for XML rendering.
1114
    VersionId cannot have `-` in it, as it fails in XML
1115
    Combine an ever-increasing part in the 8 first characters, and a random element.
1116
    We need the sequence part in order to properly implement pagination around ListObjectVersions.
1117
    By prefixing the version-id with a global increasing number, we can sort the versions
1118
    :return: an S3 VersionId containing a timestamp part in the first 8 characters
1119
    """
1120
    tok = next(global_version_id_sequence()).to_bytes(length=6) + token_bytes(18)
1✔
1121
    return base64.b64encode(tok, altchars=b"._").rstrip(b"=").decode("ascii")
1✔
1122

1123

1124
@singleton_factory
1✔
1125
def global_version_id_sequence():
1✔
1126
    start = int(time.time() * 1000)
1✔
1127
    # itertools.count is thread safe over the GIL since its getAndIncrement operation is a single python bytecode op
1128
    return itertools.count(start)
1✔
1129

1130

1131
def is_version_older_than_other(version_id: str, other: str):
1✔
1132
    """
1133
    Compare the sequence part of a VersionId against the sequence part of a VersionIdMarker. Used for pagination
1134
    See `generate_safe_version_id`
1135
    """
1136
    return base64.b64decode(version_id, altchars=b"._") < base64.b64decode(other, altchars=b"._")
1✔
1137

1138

1139
def get_bucket_location_xml(location_constraint: str) -> str:
1✔
1140
    """
1141
    Returns the formatted XML for the GetBucketLocation operation.
1142

1143
    :param location_constraint: The location constraint to return in the XML. It can be an empty string when
1144
    it's not specified in the bucket configuration.
1145
    :return: The XML response.
1146
    """
1147

1148
    return (
1✔
1149
        '<?xml version="1.0" encoding="UTF-8"?>\n'
1150
        '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/"'
1151
        + ("/>" if not location_constraint else f">{location_constraint}</LocationConstraint>")
1152
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc