• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19533426081

20 Nov 2025 08:52AM UTC coverage: 86.922% (+0.02%) from 86.903%
19533426081

push

github

web-flow
SQS: fix FIFO message visiblity when extending timeout (#13386)

21 of 21 new or added lines in 1 file covered. (100.0%)

254 existing lines in 10 files now uncovered.

68677 of 79010 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.46
/localstack-core/localstack/services/kms/models.py
1
import base64
1✔
2
import datetime
1✔
3
import io
1✔
4
import json
1✔
5
import logging
1✔
6
import os
1✔
7
import random
1✔
8
import re
1✔
9
import struct
1✔
10
import uuid
1✔
11
from collections import namedtuple
1✔
12
from dataclasses import dataclass
1✔
13

14
from cryptography.exceptions import InvalidSignature, InvalidTag, UnsupportedAlgorithm
1✔
15
from cryptography.hazmat.backends import default_backend
1✔
16
from cryptography.hazmat.primitives import hashes, hmac
1✔
17
from cryptography.hazmat.primitives import serialization as crypto_serialization
1✔
18
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils
1✔
19
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
1✔
20
from cryptography.hazmat.primitives.asymmetric.padding import PSS, PKCS1v15
1✔
21
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
1✔
22
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
1✔
23
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
1✔
24
from cryptography.hazmat.primitives.serialization import load_der_public_key
1✔
25

26
from localstack.aws.api.kms import (
1✔
27
    CreateAliasRequest,
28
    CreateGrantRequest,
29
    CreateKeyRequest,
30
    EncryptionContextType,
31
    InvalidCiphertextException,
32
    InvalidKeyUsageException,
33
    KeyMetadata,
34
    KeySpec,
35
    KeyState,
36
    KeyUsageType,
37
    KMSInvalidMacException,
38
    KMSInvalidSignatureException,
39
    LimitExceededException,
40
    MacAlgorithmSpec,
41
    MessageType,
42
    MultiRegionConfiguration,
43
    MultiRegionKey,
44
    MultiRegionKeyType,
45
    OriginType,
46
    ReplicateKeyRequest,
47
    SigningAlgorithmSpec,
48
    TagList,
49
    UnsupportedOperationException,
50
)
51
from localstack.constants import TAG_KEY_CUSTOM_ID
1✔
52
from localstack.services.kms.exceptions import TagException, ValidationException
1✔
53
from localstack.services.kms.utils import is_valid_key_arn, validate_tag
1✔
54
from localstack.services.stores import AccountRegionBundle, BaseStore, LocalAttribute
1✔
55
from localstack.utils.aws.arns import get_partition, kms_alias_arn, kms_key_arn
1✔
56
from localstack.utils.crypto import decrypt, encrypt
1✔
57
from localstack.utils.strings import long_uid, to_bytes, to_str
1✔
58

59
LOG = logging.getLogger(__name__)
1✔
60

61
PATTERN_UUID = re.compile(
1✔
62
    r"^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
63
)
64
MULTI_REGION_PATTERN = re.compile(r"^mrk-[a-fA-F0-9]{32}$")
1✔
65

66
SYMMETRIC_DEFAULT_MATERIAL_LENGTH = 32
1✔
67

68
RSA_CRYPTO_KEY_LENGTHS = {
1✔
69
    "RSA_2048": 2048,
70
    "RSA_3072": 3072,
71
    "RSA_4096": 4096,
72
}
73

74
ECC_CURVES = {
1✔
75
    "ECC_NIST_P256": ec.SECP256R1(),
76
    "ECC_NIST_P384": ec.SECP384R1(),
77
    "ECC_NIST_P521": ec.SECP521R1(),
78
    "ECC_SECG_P256K1": ec.SECP256K1(),
79
}
80

81
HMAC_RANGE_KEY_LENGTHS = {
1✔
82
    "HMAC_224": (28, 64),
83
    "HMAC_256": (32, 64),
84
    "HMAC_384": (48, 128),
85
    "HMAC_512": (64, 128),
86
}
87

88
ON_DEMAND_ROTATION_LIMIT = 10
1✔
89
KEY_ID_LEN = 36
1✔
90
# Moto uses IV_LEN of 12, as it is fine for GCM encryption mode, but we use CBC, so have to set it to 16.
91
IV_LEN = 16
1✔
92
TAG_LEN = 16
1✔
93
CIPHERTEXT_HEADER_FORMAT = f">{KEY_ID_LEN}s{IV_LEN}s{TAG_LEN}s"
1✔
94
HEADER_LEN = KEY_ID_LEN + IV_LEN + TAG_LEN
1✔
95
Ciphertext = namedtuple("Ciphertext", ("key_id", "iv", "ciphertext", "tag"))
1✔
96

97
RESERVED_ALIASES = [
1✔
98
    "alias/aws/acm",
99
    "alias/aws/dynamodb",
100
    "alias/aws/ebs",
101
    "alias/aws/elasticfilesystem",
102
    "alias/aws/es",
103
    "alias/aws/glue",
104
    "alias/aws/kinesisvideo",
105
    "alias/aws/lambda",
106
    "alias/aws/rds",
107
    "alias/aws/redshift",
108
    "alias/aws/s3",
109
    "alias/aws/secretsmanager",
110
    "alias/aws/ssm",
111
    "alias/aws/xray",
112
]
113

114
# list of key names that should be skipped when serializing the encryption context
115
IGNORED_CONTEXT_KEYS = ["aws-crypto-public-key"]
1✔
116

117
# special tag name to allow specifying a custom key material for created keys
118
TAG_KEY_CUSTOM_KEY_MATERIAL = "_custom_key_material_"
1✔
119

120

121
def _serialize_ciphertext_blob(ciphertext: Ciphertext) -> bytes:
1✔
122
    header = struct.pack(
1✔
123
        CIPHERTEXT_HEADER_FORMAT,
124
        ciphertext.key_id.encode("utf-8"),
125
        ciphertext.iv,
126
        ciphertext.tag,
127
    )
128
    return header + ciphertext.ciphertext
1✔
129

130

131
def deserialize_ciphertext_blob(ciphertext_blob: bytes) -> Ciphertext:
1✔
132
    header = ciphertext_blob[:HEADER_LEN]
1✔
133
    ciphertext = ciphertext_blob[HEADER_LEN:]
1✔
134
    key_id, iv, tag = struct.unpack(CIPHERTEXT_HEADER_FORMAT, header)
1✔
135
    return Ciphertext(key_id=key_id.decode("utf-8"), iv=iv, ciphertext=ciphertext, tag=tag)
1✔
136

137

138
def _serialize_encryption_context(encryption_context: EncryptionContextType | None) -> bytes:
1✔
139
    if encryption_context:
1✔
140
        aad = io.BytesIO()
1✔
141
        for key, value in sorted(encryption_context.items(), key=lambda x: x[0]):
1✔
142
            # remove the reserved key-value pair from additional authentication data
143
            if key not in IGNORED_CONTEXT_KEYS:
1✔
144
                aad.write(key.encode("utf-8"))
1✔
145
                aad.write(value.encode("utf-8"))
1✔
146
        return aad.getvalue()
1✔
147
    else:
148
        return b""
1✔
149

150

151
# Confusion alert!
152
# In KMS, there are two things that can be called "keys":
153
#   1. A cryptographic key, i.e. a string of characters, a private/public/symmetrical key for cryptographic encoding
154
#   and decoding etc. It is modeled here by KmsCryptoKey class.
155
#   2. An AWS object that stores both a cryptographic key and some relevant metadata, e.g. creation time, a unique ID,
156
#   some state. It is modeled by KmsKey class.
157
#
158
# While KmsKeys always contain KmsCryptoKeys, sometimes KmsCryptoKeys exist without corresponding KmsKeys,
159
# e.g. GenerateDataKeyPair API call returns contents of a new KmsCryptoKey that is not associated with any KmsKey,
160
# but is partially encrypted by some pre-existing KmsKey.
161

162

163
class KmsCryptoKey:
1✔
164
    """
165
    KmsCryptoKeys used to model both of the two cases where AWS generates keys:
166
    1. Keys that are created to be used inside of AWS. For such a key, its key material / private key are not to
167
    leave AWS unencrypted. If they have to leave AWS, a different KmsCryptoKey is used to encrypt the data first.
168
    2. Keys that AWS creates for customers for some external use. Such a key might be returned to a customer with its
169
    key material or public key unencrypted - see KMS GenerateDataKey / GenerateDataKeyPair. But such a key is not stored
170
    by AWS and is not used by AWS.
171
    """
172

173
    public_key: bytes | None
1✔
174
    private_key: bytes | None
1✔
175
    key_material: bytes
1✔
176
    pending_key_material: bytes | None
1✔
177
    key_spec: str
1✔
178

179
    @staticmethod
1✔
180
    def assert_valid(key_spec: str):
1✔
181
        """
182
        Validates that the given ``key_spec`` is supported in the current context.
183

184
        :param key_spec: The key specification to validate.
185
        :type key_spec: str
186
        :raises ValidationException: If ``key_spec`` is not a known valid spec.
187
        :raises UnsupportedOperationException: If ``key_spec`` is entirely unsupported.
188
        """
189

190
        def raise_validation():
1✔
191
            raise ValidationException(
1✔
192
                f"1 validation error detected: Value '{key_spec}' at 'keySpec' "
193
                f"failed to satisfy constraint: Member must satisfy enum value set: "
194
                f"[RSA_2048, ECC_NIST_P384, ECC_NIST_P256, ECC_NIST_P521, HMAC_384, RSA_3072, "
195
                f"ECC_SECG_P256K1, RSA_4096, SYMMETRIC_DEFAULT, HMAC_256, HMAC_224, HMAC_512]"
196
            )
197

198
        if key_spec == "SYMMETRIC_DEFAULT":
1✔
199
            return
1✔
200

201
        if key_spec.startswith("RSA"):
1✔
202
            if key_spec not in RSA_CRYPTO_KEY_LENGTHS:
1✔
203
                raise_validation()
1✔
204
            return
1✔
205

206
        if key_spec.startswith("ECC"):
1✔
207
            if key_spec not in ECC_CURVES:
1✔
208
                raise_validation()
1✔
209
            return
1✔
210

211
        if key_spec.startswith("HMAC"):
1✔
212
            if key_spec not in HMAC_RANGE_KEY_LENGTHS:
1✔
213
                raise_validation()
1✔
214
            return
1✔
215

216
        raise UnsupportedOperationException(f"KeySpec {key_spec} is not supported")
1✔
217

218
    def __init__(self, key_spec: str, key_material: bytes | None = None):
1✔
219
        self.private_key = None
1✔
220
        self.public_key = None
1✔
221
        self.pending_key_material = None
1✔
222
        # Technically, key_material, being a symmetric encryption key, is only relevant for
223
        #   key_spec == SYMMETRIC_DEFAULT.
224
        # But LocalStack uses symmetric encryption with this key_material even for other specs. Asymmetric keys are
225
        # generated, but are not actually used for encryption. Signing is different.
226
        self.key_material = key_material or os.urandom(SYMMETRIC_DEFAULT_MATERIAL_LENGTH)
1✔
227
        self.key_spec = key_spec
1✔
228

229
        KmsCryptoKey.assert_valid(key_spec)
1✔
230

231
        if key_spec == "SYMMETRIC_DEFAULT":
1✔
232
            return
1✔
233

234
        if key_spec.startswith("RSA"):
1✔
235
            key_size = RSA_CRYPTO_KEY_LENGTHS.get(key_spec)
1✔
236
            key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
1✔
237
        elif key_spec.startswith("ECC"):
1✔
238
            curve = ECC_CURVES.get(key_spec)
1✔
239
            if key_material:
1✔
240
                key = crypto_serialization.load_der_private_key(key_material, password=None)
1✔
241
            else:
242
                key = ec.generate_private_key(curve)
1✔
243
        elif key_spec.startswith("HMAC"):
1✔
244
            minimum_length, maximum_length = HMAC_RANGE_KEY_LENGTHS.get(key_spec)
1✔
245
            self.key_material = key_material or os.urandom(
1✔
246
                random.randint(minimum_length, maximum_length)
247
            )
248
            return
1✔
249

250
        self._serialize_key(key)
1✔
251

252
    def load_key_material(self, material: bytes):
1✔
253
        if self.key_spec == KeySpec.SYMMETRIC_DEFAULT:
1✔
254
            self.pending_key_material = material
1✔
255
        elif self.key_spec in [
1✔
256
            KeySpec.HMAC_224,
257
            KeySpec.HMAC_256,
258
            KeySpec.HMAC_384,
259
            KeySpec.HMAC_512,
260
        ]:
261
            self.key_material = material
1✔
262
        else:
263
            key = crypto_serialization.load_der_private_key(material, password=None)
1✔
264
            self._serialize_key(key)
1✔
265

266
    def _serialize_key(self, key: ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey):
1✔
267
        self.public_key = key.public_key().public_bytes(
1✔
268
            crypto_serialization.Encoding.DER,
269
            crypto_serialization.PublicFormat.SubjectPublicKeyInfo,
270
        )
271
        self.private_key = key.private_bytes(
1✔
272
            crypto_serialization.Encoding.DER,
273
            crypto_serialization.PrivateFormat.PKCS8,
274
            crypto_serialization.NoEncryption(),
275
        )
276

277
    @property
1✔
278
    def key(self) -> RSAPrivateKey | EllipticCurvePrivateKey:
1✔
279
        return crypto_serialization.load_der_private_key(
1✔
280
            self.private_key,
281
            password=None,
282
            backend=default_backend(),
283
        )
284

285

286
class KmsKey:
1✔
287
    metadata: KeyMetadata
1✔
288
    crypto_key: KmsCryptoKey
1✔
289
    tags: dict[str, str]
1✔
290
    policy: str
1✔
291
    is_key_rotation_enabled: bool
1✔
292
    rotation_period_in_days: int
1✔
293
    next_rotation_date: datetime.datetime
1✔
294
    previous_keys = [str]
1✔
295

296
    def __init__(
1✔
297
        self,
298
        create_key_request: CreateKeyRequest = None,
299
        account_id: str = None,
300
        region: str = None,
301
    ):
302
        create_key_request = create_key_request or CreateKeyRequest()
1✔
303
        self.previous_keys = []
1✔
304

305
        # Please keep in mind that tags of a key could be present in the request, they are not a part of metadata. At
306
        # least in the sense of DescribeKey not returning them with the rest of the metadata. Instead, tags are more
307
        # like aliases:
308
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html
309
        # "DescribeKey does not return the following information: ... Tags on the KMS key."
310
        self.tags = {}
1✔
311
        self.add_tags(create_key_request.get("Tags"))
1✔
312
        # Same goes for the policy. It is in the request, but not in the metadata.
313
        self.policy = create_key_request.get("Policy") or self._get_default_key_policy(
1✔
314
            account_id, region
315
        )
316
        # https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html
317
        # "Automatic key rotation is disabled by default on customer managed keys but authorized users can enable and
318
        # disable it."
319
        self.is_key_rotation_enabled = False
1✔
320

321
        self._populate_metadata(create_key_request, account_id, region)
1✔
322
        custom_key_material = None
1✔
323
        if TAG_KEY_CUSTOM_KEY_MATERIAL in self.tags:
1✔
324
            # check if the _custom_key_material_ tag is specified, to use a custom key material for this key
325
            custom_key_material = base64.b64decode(self.tags[TAG_KEY_CUSTOM_KEY_MATERIAL])
1✔
326
            # remove the _custom_key_material_ tag from the tags to not readily expose the custom key material
327
            del self.tags[TAG_KEY_CUSTOM_KEY_MATERIAL]
1✔
328
        self.crypto_key = KmsCryptoKey(self.metadata.get("KeySpec"), custom_key_material)
1✔
329
        self._internal_key_id = uuid.uuid4()
1✔
330

331
        # The KMS implementation always provides a crypto key with key material which doesn't suit scenarios where a
332
        # KMS Key may have no key material e.g. for external keys. Don't expose the CurrentKeyMaterialId in those cases.
333
        if custom_key_material or (
1✔
334
            self.metadata["Origin"] == "AWS_KMS"
335
            and self.metadata["KeySpec"] == KeySpec.SYMMETRIC_DEFAULT
336
        ):
337
            self.metadata["CurrentKeyMaterialId"] = self.generate_key_material_id(
1✔
338
                self.crypto_key.key_material
339
            )
340

341
        self.rotation_period_in_days = 365
1✔
342
        self.next_rotation_date = None
1✔
343

344
    def generate_key_material_id(self, key_material: bytes) -> str:
1✔
345
        # The KeyMaterialId depends on the key material and the KeyId. Use an internal ID to prevent brute forcing
346
        # the value of the key material from the public KeyId and KeyMaterialId.
347
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ImportKeyMaterial.html
348
        key_material_id_hex = uuid.uuid5(self._internal_key_id, key_material).hex
1✔
349
        return str(key_material_id_hex) * 2
1✔
350

351
    def calculate_and_set_arn(self, account_id, region):
1✔
352
        self.metadata["Arn"] = kms_key_arn(self.metadata.get("KeyId"), account_id, region)
1✔
353

354
    def generate_mac(self, msg: bytes, mac_algorithm: MacAlgorithmSpec) -> bytes:
1✔
355
        h = self._get_hmac_context(mac_algorithm)
1✔
356
        h.update(msg)
1✔
357
        return h.finalize()
1✔
358

359
    def verify_mac(self, msg: bytes, mac: bytes, mac_algorithm: MacAlgorithmSpec) -> bool:
1✔
360
        h = self._get_hmac_context(mac_algorithm)
1✔
361
        h.update(msg)
1✔
362
        try:
1✔
363
            h.verify(mac)
1✔
364
            return True
1✔
365
        except InvalidSignature:
1✔
366
            raise KMSInvalidMacException()
1✔
367

368
    # Encrypt is a method of KmsKey and not of KmsCryptoKey only because it requires KeyId, and KmsCryptoKeys do not
369
    # hold KeyIds. Maybe it would be possible to remodel this better.
370
    def encrypt(self, plaintext: bytes, encryption_context: EncryptionContextType = None) -> bytes:
1✔
371
        iv = os.urandom(IV_LEN)
1✔
372
        aad = _serialize_encryption_context(encryption_context=encryption_context)
1✔
373
        ciphertext, tag = encrypt(self.crypto_key.key_material, plaintext, iv, aad)
1✔
374
        return _serialize_ciphertext_blob(
1✔
375
            ciphertext=Ciphertext(
376
                key_id=self.metadata.get("KeyId"), iv=iv, ciphertext=ciphertext, tag=tag
377
            )
378
        )
379

380
    # The ciphertext has to be deserialized before this call.
381
    def decrypt(
1✔
382
        self, ciphertext: Ciphertext, encryption_context: EncryptionContextType = None
383
    ) -> bytes:
384
        aad = _serialize_encryption_context(encryption_context=encryption_context)
1✔
385
        keys_to_try = [self.crypto_key.key_material] + self.previous_keys
1✔
386

387
        for key in keys_to_try:
1✔
388
            try:
1✔
389
                return decrypt(key, ciphertext.ciphertext, ciphertext.iv, ciphertext.tag, aad)
1✔
390
            except (InvalidTag, InvalidSignature):
1✔
391
                continue
1✔
392

393
        raise InvalidCiphertextException()
1✔
394

395
    def decrypt_rsa(self, encrypted: bytes) -> bytes:
1✔
396
        private_key = crypto_serialization.load_der_private_key(
1✔
397
            self.crypto_key.private_key, password=None, backend=default_backend()
398
        )
399
        decrypted = private_key.decrypt(
1✔
400
            encrypted,
401
            padding.OAEP(
402
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
403
                algorithm=hashes.SHA256(),
404
                label=None,
405
            ),
406
        )
407
        return decrypted
1✔
408

409
    def sign(
1✔
410
        self, data: bytes, message_type: MessageType, signing_algorithm: SigningAlgorithmSpec
411
    ) -> bytes:
412
        hasher, wrapped_hasher = self._construct_sign_verify_hasher(signing_algorithm, message_type)
1✔
413
        try:
1✔
414
            if signing_algorithm.startswith("ECDSA"):
1✔
415
                return self.crypto_key.key.sign(data, ec.ECDSA(wrapped_hasher))
1✔
416
            else:
417
                padding = self._construct_sign_verify_padding(signing_algorithm, hasher)
1✔
418
                return self.crypto_key.key.sign(data, padding, wrapped_hasher)
1✔
419
        except ValueError as exc:
1✔
420
            raise ValidationException(str(exc))
1✔
421

422
    def verify(
1✔
423
        self,
424
        data: bytes,
425
        message_type: MessageType,
426
        signing_algorithm: SigningAlgorithmSpec,
427
        signature: bytes,
428
    ) -> bool:
429
        hasher, wrapped_hasher = self._construct_sign_verify_hasher(signing_algorithm, message_type)
1✔
430
        try:
1✔
431
            if signing_algorithm.startswith("ECDSA"):
1✔
432
                self.crypto_key.key.public_key().verify(signature, data, ec.ECDSA(wrapped_hasher))
1✔
433
            else:
434
                padding = self._construct_sign_verify_padding(signing_algorithm, hasher)
1✔
435
                self.crypto_key.key.public_key().verify(signature, data, padding, wrapped_hasher)
1✔
436
            return True
1✔
437
        except ValueError as exc:
1✔
438
            raise ValidationException(str(exc))
1✔
439
        except InvalidSignature:
1✔
440
            # AWS itself raises this exception without any additional message.
441
            raise KMSInvalidSignatureException()
1✔
442

443
    def derive_shared_secret(self, public_key: bytes) -> bytes:
1✔
444
        key_spec = self.metadata.get("KeySpec")
1✔
445
        match key_spec:
1✔
446
            case KeySpec.ECC_NIST_P256 | KeySpec.ECC_SECG_P256K1:
1✔
447
                algorithm = hashes.SHA256()
1✔
UNCOV
448
            case KeySpec.ECC_NIST_P384:
×
UNCOV
449
                algorithm = hashes.SHA384()
×
UNCOV
450
            case KeySpec.ECC_NIST_P521:
×
UNCOV
451
                algorithm = hashes.SHA512()
×
UNCOV
452
            case _:
×
UNCOV
453
                raise InvalidKeyUsageException(
×
454
                    f"{self.metadata['Arn']} key usage is {self.metadata['KeyUsage']} which is not valid for DeriveSharedSecret."
455
                )
456

457
        # Deserialize public key from DER encoded data to EllipticCurvePublicKey.
458
        try:
1✔
459
            pub_key = load_der_public_key(public_key)
1✔
460
        except (UnsupportedAlgorithm, ValueError):
1✔
461
            raise ValidationException("")
1✔
462
        shared_secret = self.crypto_key.key.exchange(ec.ECDH(), pub_key)
1✔
463
        # Perform shared secret derivation.
464
        return HKDF(
1✔
465
            algorithm=algorithm,
466
            salt=None,
467
            info=b"",
468
            length=algorithm.digest_size,
469
            backend=default_backend(),
470
        ).derive(shared_secret)
471

472
    # This method gets called when a key is replicated to another region. It's meant to populate the required metadata
473
    # fields in a new replica key.
474
    def replicate_metadata(
1✔
475
        self, replicate_key_request: ReplicateKeyRequest, account_id: str, replica_region: str
476
    ) -> None:
477
        self.metadata["Description"] = replicate_key_request.get("Description") or ""
1✔
478
        primary_key_arn = self.metadata["Arn"]
1✔
479
        # Multi region keys have the same key ID for all replicas, but ARNs differ, as they include actual regions of
480
        # replicas.
481
        self.calculate_and_set_arn(account_id, replica_region)
1✔
482

483
        current_replica_keys = self.metadata.get("MultiRegionConfiguration", {}).get(
1✔
484
            "ReplicaKeys", []
485
        )
486
        current_replica_keys.append(MultiRegionKey(Arn=self.metadata["Arn"], Region=replica_region))
1✔
487
        primary_key_region = (
1✔
488
            self.metadata.get("MultiRegionConfiguration", {}).get("PrimaryKey", {}).get("Region")
489
        )
490

491
        self.metadata["MultiRegionConfiguration"] = MultiRegionConfiguration(
1✔
492
            MultiRegionKeyType=MultiRegionKeyType.REPLICA,
493
            PrimaryKey=MultiRegionKey(
494
                Arn=primary_key_arn,
495
                Region=primary_key_region,
496
            ),
497
            ReplicaKeys=current_replica_keys,
498
        )
499

500
    def _get_hmac_context(self, mac_algorithm: MacAlgorithmSpec) -> hmac.HMAC:
1✔
501
        if mac_algorithm == "HMAC_SHA_224":
1✔
502
            h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA224())
1✔
503
        elif mac_algorithm == "HMAC_SHA_256":
1✔
504
            h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA256())
1✔
505
        elif mac_algorithm == "HMAC_SHA_384":
1✔
506
            h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA384())
1✔
507
        elif mac_algorithm == "HMAC_SHA_512":
1✔
508
            h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA512())
1✔
509
        else:
UNCOV
510
            raise ValidationException(
×
511
                f"1 validation error detected: Value '{mac_algorithm}' at 'macAlgorithm' "
512
                f"failed to satisfy constraint: Member must satisfy enum value set: "
513
                f"[HMAC_SHA_384, HMAC_SHA_256, HMAC_SHA_224, HMAC_SHA_512]"
514
            )
515
        return h
1✔
516

517
    def _construct_sign_verify_hasher(
1✔
518
        self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType
519
    ) -> (
520
        Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512,
521
        Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512,
522
    ):
523
        if "SHA_256" in signing_algorithm:
1✔
524
            hasher = hashes.SHA256()
1✔
525
        elif "SHA_384" in signing_algorithm:
1✔
526
            hasher = hashes.SHA384()
1✔
527
        elif "SHA_512" in signing_algorithm:
1✔
528
            hasher = hashes.SHA512()
1✔
529
        else:
UNCOV
530
            raise ValidationException(
×
531
                f"Unsupported hash type in SigningAlgorithm '{signing_algorithm}'"
532
            )
533

534
        wrapped_hasher = hasher
1✔
535
        if message_type == MessageType.DIGEST:
1✔
536
            wrapped_hasher = utils.Prehashed(hasher)
1✔
537
        return hasher, wrapped_hasher
1✔
538

539
    def _construct_sign_verify_padding(
1✔
540
        self,
541
        signing_algorithm: SigningAlgorithmSpec,
542
        hasher: Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512,
543
    ) -> PKCS1v15 | PSS:
544
        if signing_algorithm.startswith("RSA"):
1✔
545
            if "PKCS" in signing_algorithm:
1✔
546
                return padding.PKCS1v15()
1✔
547
            elif "PSS" in signing_algorithm:
1✔
548
                return padding.PSS(mgf=padding.MGF1(hasher), salt_length=padding.PSS.DIGEST_LENGTH)
1✔
549
            else:
UNCOV
550
                LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)
×
551

552
    # Not a comment, rather some possibly relevant links for the future.
553
    # https://docs.aws.amazon.com/kms/latest/developerguide/asymm-create-key.html
554
    # "You cannot create an elliptic curve key pair for encryption and decryption."
555
    # https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#asymmetric-keys-concept
556
    # "You can create asymmetric KMS keys that represent RSA key pairs for public key encryption or signing and
557
    # verification, or elliptic curve key pairs for signing and verification."
558
    #
559
    # A useful link with a cheat-sheet of what operations are supported by what types of keys:
560
    # https://docs.aws.amazon.com/kms/latest/developerguide/symm-asymm-compare.html
561
    #
562
    # https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#data-keys
563
    # "AWS KMS generates the data key. Then it encrypts a copy of the data key under a symmetric encryption KMS key that
564
    # you specify."
565
    #
566
    # Data keys are symmetric, data key pairs are asymmetric.
567
    def _populate_metadata(
1✔
568
        self, create_key_request: CreateKeyRequest, account_id: str, region: str
569
    ) -> None:
570
        self.metadata = KeyMetadata()
1✔
571
        # Metadata fields coming from a creation request
572
        #
573
        # We do not include tags into the metadata. Tags might be present in a key creation request, but our metadata
574
        # only contains data displayed by DescribeKey. And tags are not there:
575
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html
576
        # "DescribeKey does not return the following information: ... Tags on the KMS key."
577

578
        self.metadata["Description"] = create_key_request.get("Description") or ""
1✔
579
        self.metadata["MultiRegion"] = create_key_request.get("MultiRegion") or False
1✔
580
        self.metadata["Origin"] = create_key_request.get("Origin") or "AWS_KMS"
1✔
581
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html#KMS-CreateKey-request-CustomerMasterKeySpec
582
        # CustomerMasterKeySpec has been deprecated, still used for compatibility. Is replaced by KeySpec.
583
        # The meaning is the same, just the name differs.
584
        self.metadata["KeySpec"] = (
1✔
585
            create_key_request.get("KeySpec")
586
            or create_key_request.get("CustomerMasterKeySpec")
587
            or "SYMMETRIC_DEFAULT"
588
        )
589
        self.metadata["CustomerMasterKeySpec"] = self.metadata.get("KeySpec")
1✔
590
        self.metadata["KeyUsage"] = self._get_key_usage(
1✔
591
            create_key_request.get("KeyUsage"), self.metadata.get("KeySpec")
592
        )
593

594
        # Metadata fields AWS introduces automatically
595
        self.metadata["AWSAccountId"] = account_id
1✔
596
        self.metadata["CreationDate"] = datetime.datetime.now()
1✔
597
        self.metadata["Enabled"] = create_key_request.get("Origin") != OriginType.EXTERNAL
1✔
598
        self.metadata["KeyManager"] = "CUSTOMER"
1✔
599
        self.metadata["KeyState"] = (
1✔
600
            KeyState.Enabled
601
            if create_key_request.get("Origin") != OriginType.EXTERNAL
602
            else KeyState.PendingImport
603
        )
604

605
        if TAG_KEY_CUSTOM_ID in self.tags:
1✔
606
            # check if the _custom_id_ tag is specified, to set a user-defined KeyId for this key
607
            self.metadata["KeyId"] = self.tags[TAG_KEY_CUSTOM_ID].strip()
1✔
608
        elif self.metadata.get("MultiRegion"):
1✔
609
            # https://docs.aws.amazon.com/kms/latest/developerguide/multi-region-keys-overview.html
610
            # "Notice that multi-Region keys have a distinctive key ID that begins with mrk-. You can use the mrk- prefix to
611
            # identify MRKs programmatically."
612
            # The ID for MultiRegion keys also do not have dashes.
613
            self.metadata["KeyId"] = "mrk-" + str(uuid.uuid4().hex)
1✔
614
        else:
615
            self.metadata["KeyId"] = str(uuid.uuid4())
1✔
616
        self.calculate_and_set_arn(account_id, region)
1✔
617

618
        self._populate_encryption_algorithms(
1✔
619
            self.metadata.get("KeyUsage"), self.metadata.get("KeySpec")
620
        )
621
        self._populate_signing_algorithms(
1✔
622
            self.metadata.get("KeyUsage"), self.metadata.get("KeySpec")
623
        )
624
        self._populate_mac_algorithms(self.metadata.get("KeyUsage"), self.metadata.get("KeySpec"))
1✔
625

626
        if self.metadata["MultiRegion"]:
1✔
627
            self.metadata["MultiRegionConfiguration"] = MultiRegionConfiguration(
1✔
628
                MultiRegionKeyType=MultiRegionKeyType.PRIMARY,
629
                PrimaryKey=MultiRegionKey(Arn=self.metadata["Arn"], Region=region),
630
                ReplicaKeys=[],
631
            )
632

633
    def add_tags(self, tags: TagList) -> None:
1✔
634
        # Just in case we get None from somewhere.
635
        if not tags:
1✔
636
            return
1✔
637

638
        unique_tag_keys = {tag["TagKey"] for tag in tags}
1✔
639
        if len(unique_tag_keys) < len(tags):
1✔
640
            raise TagException("Duplicate tag keys")
1✔
641

642
        if len(tags) > 50:
1✔
643
            raise TagException("Too many tags")
1✔
644

645
        # Do not care if we overwrite an existing tag:
646
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_TagResource.html
647
        # "To edit a tag, specify an existing tag key and a new tag value."
648
        for i, tag in enumerate(tags, start=1):
1✔
649
            validate_tag(i, tag)
1✔
650
            self.tags[tag.get("TagKey")] = tag.get("TagValue")
1✔
651

652
    def schedule_key_deletion(self, pending_window_in_days: int) -> None:
1✔
653
        self.metadata["Enabled"] = False
1✔
654
        # TODO For MultiRegion keys, the status of replicas get set to "PendingDeletion", while the primary key
655
        #  becomes "PendingReplicaDeletion". Here we just set all keys to "PendingDeletion", as we do not have any
656
        #  notion of a primary key in LocalStack. Might be useful to improve it.
657
        #  https://docs.aws.amazon.com/kms/latest/developerguide/multi-region-keys-delete.html#primary-delete
658
        self.metadata["KeyState"] = "PendingDeletion"
1✔
659
        self.metadata["DeletionDate"] = datetime.datetime.now() + datetime.timedelta(
1✔
660
            days=pending_window_in_days
661
        )
662

663
    def _update_key_rotation_date(self) -> None:
1✔
664
        if not self.next_rotation_date or self.next_rotation_date < datetime.datetime.now():
1✔
665
            self.next_rotation_date = datetime.datetime.now() + datetime.timedelta(
1✔
666
                days=self.rotation_period_in_days
667
            )
668

669
    # An example of how the whole policy should look like:
670
    # https://docs.aws.amazon.com/kms/latest/developerguide/key-policy-overview.html
671
    # The default statement is here:
672
    # https://docs.aws.amazon.com/kms/latest/developerguide/key-policy-default.html#key-policy-default-allow-root-enable-iam
673
    def _get_default_key_policy(self, account_id: str, region: str) -> str:
1✔
674
        return json.dumps(
1✔
675
            {
676
                "Version": "2012-10-17",
677
                "Id": "key-default-1",
678
                "Statement": [
679
                    {
680
                        "Sid": "Enable IAM User Permissions",
681
                        "Effect": "Allow",
682
                        "Principal": {"AWS": f"arn:{get_partition(region)}:iam::{account_id}:root"},
683
                        "Action": "kms:*",
684
                        "Resource": "*",
685
                    }
686
                ],
687
            }
688
        )
689

690
    def _populate_encryption_algorithms(self, key_usage: str, key_spec: str) -> None:
1✔
691
        # The two main usages for KMS keys are encryption/decryption and signing/verification.
692
        # Doesn't make sense to populate fields related to encryption/decryption unless the key is created with that
693
        # goal in mind.
694
        if key_usage != "ENCRYPT_DECRYPT":
1✔
695
            return
1✔
696
        if key_spec == "SYMMETRIC_DEFAULT":
1✔
697
            self.metadata["EncryptionAlgorithms"] = ["SYMMETRIC_DEFAULT"]
1✔
698
        else:
699
            self.metadata["EncryptionAlgorithms"] = ["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"]
1✔
700

701
    def _populate_signing_algorithms(self, key_usage: str, key_spec: str) -> None:
1✔
702
        # The two main usages for KMS keys are encryption/decryption and signing/verification.
703
        # Doesn't make sense to populate fields related to signing/verification unless the key is created with that
704
        # goal in mind.
705
        if key_usage != "SIGN_VERIFY":
1✔
706
            return
1✔
707
        if key_spec in ["ECC_NIST_P256", "ECC_SECG_P256K1"]:
1✔
708
            self.metadata["SigningAlgorithms"] = ["ECDSA_SHA_256"]
1✔
709
        elif key_spec == "ECC_NIST_P384":
1✔
710
            self.metadata["SigningAlgorithms"] = ["ECDSA_SHA_384"]
1✔
711
        elif key_spec == "ECC_NIST_P521":
1✔
712
            self.metadata["SigningAlgorithms"] = ["ECDSA_SHA_512"]
1✔
713
        else:
714
            self.metadata["SigningAlgorithms"] = [
1✔
715
                "RSASSA_PKCS1_V1_5_SHA_256",
716
                "RSASSA_PKCS1_V1_5_SHA_384",
717
                "RSASSA_PKCS1_V1_5_SHA_512",
718
                "RSASSA_PSS_SHA_256",
719
                "RSASSA_PSS_SHA_384",
720
                "RSASSA_PSS_SHA_512",
721
            ]
722

723
    def _populate_mac_algorithms(self, key_usage: str, key_spec: str) -> None:
1✔
724
        if key_usage != "GENERATE_VERIFY_MAC":
1✔
725
            return
1✔
726
        if key_spec == "HMAC_224":
1✔
727
            self.metadata["MacAlgorithms"] = ["HMAC_SHA_224"]
1✔
728
        elif key_spec == "HMAC_256":
1✔
729
            self.metadata["MacAlgorithms"] = ["HMAC_SHA_256"]
1✔
730
        elif key_spec == "HMAC_384":
1✔
731
            self.metadata["MacAlgorithms"] = ["HMAC_SHA_384"]
1✔
732
        elif key_spec == "HMAC_512":
1✔
733
            self.metadata["MacAlgorithms"] = ["HMAC_SHA_512"]
1✔
734

735
    def _get_key_usage(self, request_key_usage: str, key_spec: str) -> str:
1✔
736
        if key_spec in HMAC_RANGE_KEY_LENGTHS:
1✔
737
            if request_key_usage is None:
1✔
738
                raise ValidationException(
1✔
739
                    "You must specify a KeyUsage value for all KMS keys except for symmetric encryption keys."
740
                )
741
            elif request_key_usage != KeyUsageType.GENERATE_VERIFY_MAC:
1✔
742
                raise ValidationException(
1✔
743
                    f"1 validation error detected: Value '{request_key_usage}' at 'keyUsage' "
744
                    f"failed to satisfy constraint: Member must satisfy enum value set: "
745
                    f"[ENCRYPT_DECRYPT, SIGN_VERIFY, GENERATE_VERIFY_MAC]"
746
                )
747
            else:
748
                return KeyUsageType.GENERATE_VERIFY_MAC
1✔
749
        elif request_key_usage == KeyUsageType.KEY_AGREEMENT:
1✔
750
            if key_spec not in [
1✔
751
                KeySpec.ECC_NIST_P256,
752
                KeySpec.ECC_NIST_P384,
753
                KeySpec.ECC_NIST_P521,
754
                KeySpec.ECC_SECG_P256K1,
755
                KeySpec.SM2,
756
            ]:
757
                raise ValidationException(
1✔
758
                    f"KeyUsage {request_key_usage} is not compatible with KeySpec {key_spec}"
759
                )
760
            else:
761
                return request_key_usage
1✔
762
        else:
763
            return request_key_usage or "ENCRYPT_DECRYPT"
1✔
764

765
    def rotate_key_on_demand(self):
1✔
766
        if len(self.previous_keys) >= ON_DEMAND_ROTATION_LIMIT:
1✔
767
            raise LimitExceededException(
1✔
768
                f"The on-demand rotations limit has been reached for the given keyId. "
769
                f"No more on-demand rotations can be performed for this key: {self.metadata['Arn']}"
770
            )
771
        current_key_material = self.crypto_key.key_material
1✔
772
        pending_key_material = self.crypto_key.pending_key_material
1✔
773

774
        self.previous_keys.append(current_key_material)
1✔
775

776
        # If there is no pending material stored on the key, then key material will be generated.
777
        self.crypto_key = KmsCryptoKey(KeySpec.SYMMETRIC_DEFAULT, pending_key_material)
1✔
778
        self.metadata["CurrentKeyMaterialId"] = self.generate_key_material_id(
1✔
779
            self.crypto_key.key_material
780
        )
781

782

783
class KmsGrant:
1✔
784
    # AWS documentation doesn't seem to mention any metadata object for grants like it does mention KeyMetadata for
785
    # keys. But, based on our understanding of AWS documentation for CreateGrant, ListGrants operations etc,
786
    # AWS has some set of fields for grants like it has for keys. So we are going to call them `metadata` here for
787
    # consistency.
788
    metadata: dict
1✔
789
    # Tokens are not a part of metadata, as their use is more limited and specific than for the rest of the
790
    # metadata: https://docs.aws.amazon.com/kms/latest/developerguide/grant-manage.html#using-grant-token
791
    # Tokens are used to refer to a grant in a short period right after the grant gets created. Normally it might
792
    # take KMS up to 5 minutes to make a new grant available. In that time window referring to a grant by its
793
    # GrantId might not work, so tokens are supposed to be used. The tokens could possibly be used even
794
    # afterwards. But since the only way to get a token is through a CreateGrant operation (see below), the chances
795
    # of someone storing a token and using it later are slim.
796
    #
797
    # https://docs.aws.amazon.com/kms/latest/developerguide/grants.html#grant_token
798
    # "CreateGrant is the only operation that returns a grant token. You cannot get a grant token from any other
799
    # AWS KMS operation or from the CloudTrail log event for the CreateGrant operation. The ListGrants and
800
    # ListRetirableGrants operations return the grant ID, but not a grant token."
801
    #
802
    # Usually a grant might have multiple unique tokens. But here we just model it with a single token for
803
    # simplicity.
804
    token: str
1✔
805

806
    def __init__(self, create_grant_request: CreateGrantRequest, account_id: str, region_name: str):
1✔
807
        self.metadata = dict(create_grant_request)
1✔
808

809
        if is_valid_key_arn(self.metadata["KeyId"]):
1✔
UNCOV
810
            self.metadata["KeyArn"] = self.metadata["KeyId"]
×
811
        else:
812
            self.metadata["KeyArn"] = kms_key_arn(self.metadata["KeyId"], account_id, region_name)
1✔
813

814
        self.metadata["GrantId"] = long_uid()
1✔
815
        self.metadata["CreationDate"] = datetime.datetime.now()
1✔
816
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_GrantListEntry.html
817
        # "If a name was provided in the CreateGrant request, that name is returned. Otherwise this value is null."
818
        # According to the examples in AWS docs
819
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListGrants.html#API_ListGrants_Examples
820
        # The Name field is present with just an empty string value.
821
        self.metadata.setdefault("Name", "")
1✔
822

823
        # Encode account ID and region in grant token.
824
        # This way the grant can be located when being retired by grant principal.
825
        # The token consists of account ID, region name and a UUID concatenated with ':' and encoded with base64
826
        decoded_token = account_id + ":" + region_name + ":" + long_uid()
1✔
827
        self.token = to_str(base64.b64encode(to_bytes(decoded_token)))
1✔
828

829

830
class KmsAlias:
1✔
831
    # Like with grants (see comment for KmsGrant), there is no mention of some specific object modeling metadata
832
    # for KMS aliases. But there is data that is some metadata, so we model it in a way similar to KeyMetadata for keys.
833
    metadata: dict
1✔
834

835
    def __init__(
1✔
836
        self,
837
        create_alias_request: CreateAliasRequest = None,
838
        account_id: str = None,
839
        region: str = None,
840
    ):
841
        create_alias_request = create_alias_request or CreateAliasRequest()
1✔
842
        self.metadata = {}
1✔
843
        self.metadata["AliasName"] = create_alias_request.get("AliasName")
1✔
844
        self.metadata["TargetKeyId"] = create_alias_request.get("TargetKeyId")
1✔
845
        self.update_date_of_last_update()
1✔
846
        self.metadata["CreationDate"] = self.metadata["LastUpdateDate"]
1✔
847
        self.metadata["AliasArn"] = kms_alias_arn(self.metadata["AliasName"], account_id, region)
1✔
848

849
    def update_date_of_last_update(self):
1✔
850
        self.metadata["LastUpdateDate"] = datetime.datetime.now()
1✔
851

852

853
@dataclass
1✔
854
class KeyImportState:
1✔
855
    key_id: str
1✔
856
    import_token: str
1✔
857
    wrapping_algo: str
1✔
858
    key: KmsKey
1✔
859

860

861
class KmsStore(BaseStore):
1✔
862
    # maps key ids to keys
863
    keys: dict[str, KmsKey] = LocalAttribute(default=dict)
1✔
864

865
    # According to AWS documentation on grants https://docs.aws.amazon.com/kms/latest/APIReference/API_RetireGrant.html
866
    # "Cross-account use: Yes. You can retire a grant on a KMS key in a different AWS account."
867

868
    # maps grant ids to grants
869
    grants: dict[str, KmsGrant] = LocalAttribute(default=dict)
1✔
870

871
    # maps from (grant names (used for idempotency), key id) to grant ids
872
    grant_names: dict[tuple[str, str], str] = LocalAttribute(default=dict)
1✔
873

874
    # maps grant tokens to grant ids
875
    grant_tokens: dict[str, str] = LocalAttribute(default=dict)
1✔
876

877
    # maps key alias names to aliases
878
    aliases: dict[str, KmsAlias] = LocalAttribute(default=dict)
1✔
879

880
    # maps import tokens to import data
881
    imports: dict[str, KeyImportState] = LocalAttribute(default=dict)
1✔
882

883

884
kms_stores = AccountRegionBundle("kms", KmsStore)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc