• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.72
/localstack-core/localstack/services/kms/utils.py
1
import base64
1✔
2
import re
1✔
3
from collections.abc import Callable
1✔
4

5
from localstack.aws.api.kms import (
1✔
6
    CreateKeyRequest,
7
    DryRunOperationException,
8
    Tag,
9
    TagException,
10
    TagList,
11
)
12
from localstack.constants import TAG_KEY_CUSTOM_ID
1✔
13
from localstack.services.kms.exceptions import ValidationException
1✔
14
from localstack.utils.aws.arns import ARN_PARTITION_REGEX
1✔
15

16
KMS_KEY_ARN_PATTERN = re.compile(
1✔
17
    rf"{ARN_PARTITION_REGEX}:kms:(?P<region_name>[^:]+):(?P<account_id>\d{{12}}):((?=key/)key/|(?=alias/))(?P<key_id>[^:]+)$"
18
)
19
# special tag name to allow specifying a custom key material for created keys
20
TAG_KEY_CUSTOM_KEY_MATERIAL = "_custom_key_material_"
1✔
21

22

23
def get_hash_algorithm(signing_algorithm: str) -> str:
1✔
24
    """
25
    Return the hashing algorithm for a given signing algorithm.
26
    eg. "RSASSA_PSS_SHA_512" -> "SHA_512"
27
    """
28
    return "_".join(signing_algorithm.rsplit(sep="_", maxsplit=-2)[-2:])
1✔
29

30

31
def parse_key_arn(key_arn: str) -> tuple[str, str, str]:
1✔
32
    """
33
    Parse a valid KMS key arn into its constituents.
34

35
    :param key_arn: KMS key ARN
36
    :return: Tuple of account ID, region name and key ID
37
    """
38
    return KMS_KEY_ARN_PATTERN.match(key_arn).group("account_id", "region_name", "key_id")
1✔
39

40

41
def is_valid_key_arn(key_arn: str) -> bool:
1✔
42
    """
43
    Check if a given string is a valid KMS key ARN.
44
    """
45
    return KMS_KEY_ARN_PATTERN.match(key_arn) is not None
1✔
46

47

48
def validate_alias_name(alias_name: str) -> None:
1✔
49
    if not alias_name.startswith("alias/"):
1✔
50
        raise ValidationException(
1✔
51
            'Alias must start with the prefix "alias/". Please see '
52
            "https://docs.aws.amazon.com/kms/latest/developerguide/kms-alias.html"
53
        )
54

55

56
def validate_tag(tag_position: int, tag: Tag) -> None:
1✔
57
    tag_key = tag.get("TagKey")
1✔
58
    tag_value = tag.get("TagValue")
1✔
59

60
    if len(tag_key) > 128:
1✔
61
        raise ValidationException(
1✔
62
            f"1 validation error detected: Value '{tag_key}' at 'tags.{tag_position}.member.tagKey' failed to satisfy constraint: Member must have length less than or equal to 128"
63
        )
64
    if len(tag_value) > 256:
1✔
65
        raise ValidationException(
1✔
66
            f"1 validation error detected: Value '{tag_value}' at 'tags.{tag_position}.member.tagValue' failed to satisfy constraint: Member must have length less than or equal to 256"
67
        )
68

69
    if tag_key.lower().startswith("aws:"):
1✔
70
        raise TagException("Tags beginning with aws: are reserved")
1✔
71

72

73
def validate_and_filter_tags(tag_list: TagList) -> TagList:
1✔
74
    """
75
    Validates tags and filters out LocalStack specific tags
76

77
    :param tag_list: The list of tags provided to apply to the KMS key.
78
    :returns: Filtered and validated list of tags to apply to the KMS key.
79
    """
80
    unique_tag_keys = {tag["TagKey"] for tag in tag_list}
1✔
81
    if len(unique_tag_keys) < len(tag_list):
1✔
82
        raise TagException("Duplicate tag keys")
1✔
83
    if len(tag_list) > 50:
1✔
84
        raise TagException("Too many tags")
1✔
85

86
    validated_tags = []
1✔
87
    for i, tag in enumerate(tag_list, start=1):
1✔
88
        if tag["TagKey"] != TAG_KEY_CUSTOM_KEY_MATERIAL:
1✔
89
            validate_tag(i, tag)
1✔
90
            validated_tags.append(tag)
1✔
91

92
    return validated_tags
1✔
93

94

95
def get_custom_key_material(request: CreateKeyRequest | None) -> bytes | None:
1✔
96
    """
97
    Retrieves custom material which is sent in a CreateKeyRequest via the Tags.
98

99
    :param request: The request for creating a KMS key.
100
    :returns: Custom key material for the KMS key.
101
    """
102
    if not request:
1✔
103
        return None
×
104

105
    custom_key_material = None
1✔
106
    tags = request.get("Tags", [])
1✔
107
    for tag in tags:
1✔
108
        if tag["TagKey"] == TAG_KEY_CUSTOM_KEY_MATERIAL:
1✔
109
            custom_key_material = base64.b64decode(tag["TagValue"])
1✔
110

111
    return custom_key_material
1✔
112

113

114
def get_custom_key_id(request: CreateKeyRequest | None) -> bytes | None:
1✔
115
    """
116
    Retrieves a custom Key ID for the KMS key which is sent in a CreateKeyRequest via the Tags.
117

118
    :param request: The request for creating a KMS key.
119
    :returns: THe custom Key ID for the KMS key.
120
    """
121
    if not request:
1✔
122
        return None
×
123

124
    custom_key_id = None
1✔
125
    tags = request.get("Tags", [])
1✔
126
    for tag in tags:
1✔
127
        if tag["TagKey"] == TAG_KEY_CUSTOM_ID:
1✔
128
            custom_key_id = tag["TagValue"]
1✔
129

130
    return custom_key_id
1✔
131

132

133
def execute_dry_run_capable[T](func: Callable[..., T], dry_run: bool, *args, **kwargs) -> T:
1✔
134
    """
135
    Executes a function unless dry run mode is enabled.
136

137
    If ``dry_run`` is ``True``, the function is not executed and a
138
    ``DryRunOperationException`` is raised. Otherwise, the provided
139
    function is called with the given positional and keyword arguments.
140

141
    :param func: The function to be executed.
142
    :type func: Callable[..., T]
143
    :param dry_run: Flag indicating whether the execution is a dry run.
144
    :type dry_run: bool
145
    :param args: Positional arguments to pass to the function.
146
    :param kwargs: Keyword arguments to pass to the function.
147
    :returns: The result of the function call if ``dry_run`` is ``False``.
148
    :rtype: T
149
    :raises DryRunOperationException: If ``dry_run`` is ``True``.
150
    """
151
    if dry_run:
1✔
152
        raise DryRunOperationException(
1✔
153
            "The request would have succeeded, but the DryRun option is set."
154
        )
155
    return func(*args, **kwargs)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc