• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20449761985

22 Dec 2025 09:22PM UTC coverage: 86.912% (-0.008%) from 86.92%
20449761985

push

github

web-flow
APIGW: improve store typing (#13552)

9 of 9 new or added lines in 1 file covered. (100.0%)

130 existing lines in 7 files now uncovered.

70016 of 80560 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.3
/localstack-core/localstack/services/kms/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import logging
1✔
5
import os
1✔
6

7
from cbor2 import loads as cbor2_loads
1✔
8
from cryptography.exceptions import InvalidTag
1✔
9
from cryptography.hazmat.backends import default_backend
1✔
10
from cryptography.hazmat.primitives import hashes, keywrap
1✔
11
from cryptography.hazmat.primitives.asymmetric import padding
1✔
12
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
1✔
13
from cryptography.hazmat.primitives.serialization import load_der_public_key
1✔
14

15
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
16
from localstack.aws.api.kms import (
1✔
17
    AlgorithmSpec,
18
    AlreadyExistsException,
19
    BackingKeyIdType,
20
    CancelKeyDeletionRequest,
21
    CancelKeyDeletionResponse,
22
    CiphertextType,
23
    CreateAliasRequest,
24
    CreateGrantRequest,
25
    CreateGrantResponse,
26
    CreateKeyRequest,
27
    CreateKeyResponse,
28
    DataKeyPairSpec,
29
    DateType,
30
    DecryptResponse,
31
    DeleteAliasRequest,
32
    DeleteImportedKeyMaterialResponse,
33
    DeriveSharedSecretResponse,
34
    DescribeKeyRequest,
35
    DescribeKeyResponse,
36
    DisabledException,
37
    DisableKeyRequest,
38
    DisableKeyRotationRequest,
39
    EnableKeyRequest,
40
    EnableKeyRotationRequest,
41
    EncryptionAlgorithmSpec,
42
    EncryptionContextType,
43
    EncryptResponse,
44
    ExpirationModelType,
45
    GenerateDataKeyPairResponse,
46
    GenerateDataKeyPairWithoutPlaintextResponse,
47
    GenerateDataKeyRequest,
48
    GenerateDataKeyResponse,
49
    GenerateDataKeyWithoutPlaintextRequest,
50
    GenerateDataKeyWithoutPlaintextResponse,
51
    GenerateMacRequest,
52
    GenerateMacResponse,
53
    GenerateRandomRequest,
54
    GenerateRandomResponse,
55
    GetKeyPolicyRequest,
56
    GetKeyPolicyResponse,
57
    GetKeyRotationStatusRequest,
58
    GetKeyRotationStatusResponse,
59
    GetParametersForImportResponse,
60
    GetPublicKeyResponse,
61
    GrantIdType,
62
    GrantTokenList,
63
    GrantTokenType,
64
    ImportKeyMaterialResponse,
65
    ImportType,
66
    IncorrectKeyException,
67
    InvalidCiphertextException,
68
    InvalidGrantIdException,
69
    InvalidKeyUsageException,
70
    KeyAgreementAlgorithmSpec,
71
    KeyIdType,
72
    KeyMaterialDescriptionType,
73
    KeySpec,
74
    KeyState,
75
    KeyUsageType,
76
    KmsApi,
77
    KMSInvalidStateException,
78
    LimitType,
79
    ListAliasesResponse,
80
    ListGrantsRequest,
81
    ListGrantsResponse,
82
    ListKeyPoliciesRequest,
83
    ListKeyPoliciesResponse,
84
    ListKeysRequest,
85
    ListKeysResponse,
86
    ListResourceTagsRequest,
87
    ListResourceTagsResponse,
88
    MacAlgorithmSpec,
89
    MarkerType,
90
    MultiRegionKey,
91
    MultiRegionKeyType,
92
    NotFoundException,
93
    NullableBooleanType,
94
    OriginType,
95
    PlaintextType,
96
    PrincipalIdType,
97
    PublicKeyType,
98
    PutKeyPolicyRequest,
99
    RecipientInfo,
100
    ReEncryptResponse,
101
    ReplicateKeyRequest,
102
    ReplicateKeyResponse,
103
    RotateKeyOnDemandRequest,
104
    RotateKeyOnDemandResponse,
105
    ScheduleKeyDeletionRequest,
106
    ScheduleKeyDeletionResponse,
107
    SignRequest,
108
    SignResponse,
109
    TagResourceRequest,
110
    UnsupportedOperationException,
111
    UntagResourceRequest,
112
    UpdateAliasRequest,
113
    UpdateKeyDescriptionRequest,
114
    VerifyMacRequest,
115
    VerifyMacResponse,
116
    VerifyRequest,
117
    VerifyResponse,
118
    WrappingKeySpec,
119
)
120
from localstack.services.kms.exceptions import ValidationException
1✔
121
from localstack.services.kms.models import (
1✔
122
    MULTI_REGION_PATTERN,
123
    PATTERN_UUID,
124
    RESERVED_ALIASES,
125
    KeyImportState,
126
    KmsAlias,
127
    KmsCryptoKey,
128
    KmsGrant,
129
    KmsKey,
130
    KmsStore,
131
    deserialize_ciphertext_blob,
132
    kms_stores,
133
)
134
from localstack.services.kms.utils import (
1✔
135
    execute_dry_run_capable,
136
    is_valid_key_arn,
137
    parse_key_arn,
138
    validate_alias_name,
139
)
140
from localstack.services.plugins import ServiceLifecycleHook
1✔
141
from localstack.state import StateVisitor
1✔
142
from localstack.utils.aws.arns import get_partition, kms_alias_arn, parse_arn
1✔
143
from localstack.utils.collections import PaginatedList
1✔
144
from localstack.utils.common import select_attributes
1✔
145
from localstack.utils.crypto import pkcs7_envelope_encrypt
1✔
146
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
147

148
LOG = logging.getLogger(__name__)
1✔
149

150
# valid operations
151
VALID_OPERATIONS = [
1✔
152
    "CreateKey",
153
    "Decrypt",
154
    "Encrypt",
155
    "GenerateDataKey",
156
    "GenerateDataKeyWithoutPlaintext",
157
    "ReEncryptFrom",
158
    "ReEncryptTo",
159
    "Sign",
160
    "Verify",
161
    "GetPublicKey",
162
    "CreateGrant",
163
    "RetireGrant",
164
    "DescribeKey",
165
    "GenerateDataKeyPair",
166
    "GenerateDataKeyPairWithoutPlaintext",
167
]
168

169

170
class ValidationError(CommonServiceException):
1✔
171
    """General validation error type (defined in the AWS docs, but not part of the botocore spec)"""
172

173
    def __init__(self, message=None):
1✔
174
        super().__init__("ValidationError", message=message)
×
175

176

177
# For all operations constraints for states of keys are based on
178
# https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
179
class KmsProvider(KmsApi, ServiceLifecycleHook):
1✔
180
    """
181
    The LocalStack Key Management Service (KMS) provider.
182

183
    Cross-account access is supported by following operations where key ID belonging
184
    to another account can be used with the key ARN.
185
    - CreateGrant
186
    - DescribeKey
187
    - GetKeyRotationStatus
188
    - GetPublicKey
189
    - ListGrants
190
    - RetireGrant
191
    - RevokeGrant
192
    - Decrypt
193
    - Encrypt
194
    - GenerateDataKey
195
    - GenerateDataKeyPair
196
    - GenerateDataKeyPairWithoutPlaintext
197
    - GenerateDataKeyWithoutPlaintext
198
    - GenerateMac
199
    - ReEncrypt
200
    - Sign
201
    - Verify
202
    - VerifyMac
203
    """
204

205
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
206
        visitor.visit(kms_stores)
×
207

208
    #
209
    # Helpers
210
    #
211

212
    @staticmethod
1✔
213
    def _get_store(account_id: str, region_name: str) -> KmsStore:
1✔
214
        return kms_stores[account_id][region_name]
1✔
215

216
    @staticmethod
1✔
217
    def _create_kms_alias(account_id: str, region_name: str, request: CreateAliasRequest):
1✔
218
        store = kms_stores[account_id][region_name]
1✔
219
        alias = KmsAlias(request, account_id, region_name)
1✔
220
        alias_name = request.get("AliasName")
1✔
221
        store.aliases[alias_name] = alias
1✔
222

223
    @staticmethod
1✔
224
    def _create_kms_key(
1✔
225
        account_id: str, region_name: str, request: CreateKeyRequest = None
226
    ) -> KmsKey:
227
        store = kms_stores[account_id][region_name]
1✔
228
        key = KmsKey(request, account_id, region_name)
1✔
229
        key_id = key.metadata["KeyId"]
1✔
230
        store.keys[key_id] = key
1✔
231
        return key
1✔
232

233
    @staticmethod
1✔
234
    def _get_key_id_from_any_id(account_id: str, region_name: str, some_id: str) -> str:
1✔
235
        """
236
        Resolve a KMS key ID by using one of the following identifiers:
237
        - key ID
238
        - key ARN
239
        - key alias
240
        - key alias ARN
241
        """
242
        alias_name = None
1✔
243
        key_id = None
1✔
244
        key_arn = None
1✔
245

246
        if some_id.startswith("arn:"):
1✔
247
            if ":alias/" in some_id:
1✔
248
                alias_arn = some_id
×
249
                alias_name = "alias/" + alias_arn.split(":alias/")[1]
×
250
            elif ":key/" in some_id:
1✔
251
                key_arn = some_id
1✔
252
                key_id = key_arn.split(":key/")[1]
1✔
253
                parsed_arn = parse_arn(key_arn)
1✔
254
                if parsed_arn["region"] != region_name:
1✔
255
                    raise NotFoundException(f"Invalid arn {parsed_arn['region']}")
×
256
            else:
257
                raise ValueError(
×
258
                    f"Supplied value of {some_id} is an ARN, but neither of a KMS key nor of a KMS key "
259
                    f"alias"
260
                )
261
        elif some_id.startswith("alias/"):
1✔
262
            alias_name = some_id
1✔
263
        else:
264
            key_id = some_id
1✔
265

266
        store = kms_stores[account_id][region_name]
1✔
267

268
        if alias_name:
1✔
269
            KmsProvider._create_alias_if_reserved_and_not_exists(
1✔
270
                account_id,
271
                region_name,
272
                alias_name,
273
            )
274
            if alias_name not in store.aliases:
1✔
275
                raise NotFoundException(f"Unable to find KMS alias with name {alias_name}")
×
276
            key_id = store.aliases[alias_name].metadata["TargetKeyId"]
1✔
277

278
        # regular KeyId are UUID, and MultiRegion keys starts with 'mrk-' and 32 hex chars
279
        if not PATTERN_UUID.match(key_id) and not MULTI_REGION_PATTERN.match(key_id):
1✔
280
            raise NotFoundException(f"Invalid keyId '{key_id}'")
1✔
281

282
        if key_id not in store.keys:
1✔
283
            if not key_arn:
1✔
284
                key_arn = (
1✔
285
                    f"arn:{get_partition(region_name)}:kms:{region_name}:{account_id}:key/{key_id}"
286
                )
287
            raise NotFoundException(f"Key '{key_arn}' does not exist")
1✔
288

289
        return key_id
1✔
290

291
    @staticmethod
1✔
292
    def _create_alias_if_reserved_and_not_exists(
1✔
293
        account_id: str, region_name: str, alias_name: str
294
    ):
295
        store = kms_stores[account_id][region_name]
1✔
296
        if alias_name not in RESERVED_ALIASES or alias_name in store.aliases:
1✔
297
            return
1✔
298
        create_key_request = {}
×
299
        key_id = KmsProvider._create_kms_key(
×
300
            account_id,
301
            region_name,
302
            create_key_request,
303
        ).metadata.get("KeyId")
304
        create_alias_request = CreateAliasRequest(AliasName=alias_name, TargetKeyId=key_id)
×
305
        KmsProvider._create_kms_alias(account_id, region_name, create_alias_request)
×
306

307
    # While in AWS keys have more than Enabled, Disabled and PendingDeletion states, we currently only model these 3
308
    # in LocalStack, so this function is limited to them.
309
    #
310
    # The current default values are based on most of the operations working in AWS with enabled keys, but failing with
311
    # disabled and those pending deletion.
312
    #
313
    # If we decide to use the other states as well, we might want to come up with a better key state validation per
314
    # operation. Can consult this page for what states are supported by various operations:
315
    # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
316
    @staticmethod
1✔
317
    def _get_kms_key(
1✔
318
        account_id: str,
319
        region_name: str,
320
        any_type_of_key_id: str,
321
        any_key_state_allowed: bool = False,
322
        enabled_key_allowed: bool = True,
323
        disabled_key_allowed: bool = False,
324
        pending_deletion_key_allowed: bool = False,
325
    ) -> KmsKey:
326
        store = kms_stores[account_id][region_name]
1✔
327

328
        if any_key_state_allowed:
1✔
329
            enabled_key_allowed = True
1✔
330
            disabled_key_allowed = True
1✔
331
            pending_deletion_key_allowed = True
1✔
332
        if not (enabled_key_allowed or disabled_key_allowed or pending_deletion_key_allowed):
1✔
333
            raise ValueError("A key is requested, but all possible key states are prohibited")
×
334

335
        key_id = KmsProvider._get_key_id_from_any_id(account_id, region_name, any_type_of_key_id)
1✔
336
        key = store.keys[key_id]
1✔
337

338
        if not disabled_key_allowed and key.metadata.get("KeyState") == "Disabled":
1✔
339
            raise DisabledException(f"{key.metadata.get('Arn')} is disabled.")
1✔
340
        if not pending_deletion_key_allowed and key.metadata.get("KeyState") == "PendingDeletion":
1✔
341
            raise KMSInvalidStateException(f"{key.metadata.get('Arn')} is pending deletion.")
1✔
342
        if not enabled_key_allowed and key.metadata.get("KeyState") == "Enabled":
1✔
343
            raise KMSInvalidStateException(
×
344
                f"{key.metadata.get('Arn')} is enabled, but the operation doesn't support "
345
                f"such a state"
346
            )
347
        return store.keys[key_id]
1✔
348

349
    @staticmethod
1✔
350
    def _get_kms_alias(account_id: str, region_name: str, alias_name_or_arn: str) -> KmsAlias:
1✔
351
        store = kms_stores[account_id][region_name]
1✔
352

353
        if not alias_name_or_arn.startswith("arn:"):
1✔
354
            alias_name = alias_name_or_arn
1✔
355
        else:
356
            if ":alias/" not in alias_name_or_arn:
×
357
                raise ValidationException(f"{alias_name_or_arn} is not a valid alias ARN")
×
358
            alias_name = "alias/" + alias_name_or_arn.split(":alias/")[1]
×
359

360
        validate_alias_name(alias_name)
1✔
361

362
        if alias_name not in store.aliases:
1✔
363
            alias_arn = kms_alias_arn(alias_name, account_id, region_name)
×
364
            # AWS itself uses AliasArn instead of AliasName in this exception.
365
            raise NotFoundException(f"Alias {alias_arn} is not found.")
×
366

367
        return store.aliases.get(alias_name)
1✔
368

369
    @staticmethod
1✔
370
    def _parse_key_id(key_id_or_arn: str, context: RequestContext) -> tuple[str, str, str]:
1✔
371
        """
372
        Return locator attributes (account ID, region_name, key ID) of a given KMS key.
373

374
        If an ARN is provided, this is extracted from it. Otherwise, context data is used.
375

376
        :param key_id_or_arn: KMS key ID or ARN
377
        :param context: request context
378
        :return: Tuple of account ID, region name and key ID
379
        """
380
        if is_valid_key_arn(key_id_or_arn):
1✔
381
            account_id, region_name, key_id = parse_key_arn(key_id_or_arn)
1✔
382
            if region_name != context.region:
1✔
383
                raise NotFoundException(f"Invalid arn {region_name}")
1✔
384
            return account_id, region_name, key_id
1✔
385

386
        return context.account_id, context.region, key_id_or_arn
1✔
387

388
    @staticmethod
1✔
389
    def _is_rsa_spec(key_spec: str) -> bool:
1✔
390
        return key_spec in [KeySpec.RSA_2048, KeySpec.RSA_3072, KeySpec.RSA_4096]
1✔
391

392
    #
393
    # Operation Handlers
394
    #
395

396
    @handler("CreateKey", expand=False)
1✔
397
    def create_key(
1✔
398
        self,
399
        context: RequestContext,
400
        request: CreateKeyRequest = None,
401
    ) -> CreateKeyResponse:
402
        key = self._create_kms_key(context.account_id, context.region, request)
1✔
403
        return CreateKeyResponse(KeyMetadata=key.metadata)
1✔
404

405
    @handler("ScheduleKeyDeletion", expand=False)
1✔
406
    def schedule_key_deletion(
1✔
407
        self, context: RequestContext, request: ScheduleKeyDeletionRequest
408
    ) -> ScheduleKeyDeletionResponse:
409
        pending_window = int(request.get("PendingWindowInDays", 30))
1✔
410
        if pending_window < 7 or pending_window > 30:
1✔
411
            raise ValidationException(
×
412
                f"PendingWindowInDays should be between 7 and 30, but it is {pending_window}"
413
            )
414
        key = self._get_kms_key(
1✔
415
            context.account_id,
416
            context.region,
417
            request.get("KeyId"),
418
            enabled_key_allowed=True,
419
            disabled_key_allowed=True,
420
        )
421
        key.schedule_key_deletion(pending_window)
1✔
422
        attrs = ["DeletionDate", "KeyId", "KeyState"]
1✔
423
        result = select_attributes(key.metadata, attrs)
1✔
424
        result["PendingWindowInDays"] = pending_window
1✔
425
        return ScheduleKeyDeletionResponse(**result)
1✔
426

427
    @handler("CancelKeyDeletion", expand=False)
1✔
428
    def cancel_key_deletion(
1✔
429
        self, context: RequestContext, request: CancelKeyDeletionRequest
430
    ) -> CancelKeyDeletionResponse:
431
        key = self._get_kms_key(
1✔
432
            context.account_id,
433
            context.region,
434
            request.get("KeyId"),
435
            enabled_key_allowed=False,
436
            pending_deletion_key_allowed=True,
437
        )
438
        key.metadata["KeyState"] = KeyState.Disabled
1✔
439
        key.metadata["DeletionDate"] = None
1✔
440
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CancelKeyDeletion.html#API_CancelKeyDeletion_ResponseElements
441
        # "The Amazon Resource Name (key ARN) of the KMS key whose deletion is canceled."
442
        return CancelKeyDeletionResponse(KeyId=key.metadata.get("Arn"))
1✔
443

444
    @handler("DisableKey", expand=False)
1✔
445
    def disable_key(self, context: RequestContext, request: DisableKeyRequest) -> None:
1✔
446
        # Technically, AWS allows DisableKey for keys that are already disabled.
447
        key = self._get_kms_key(
1✔
448
            context.account_id,
449
            context.region,
450
            request.get("KeyId"),
451
            enabled_key_allowed=True,
452
            disabled_key_allowed=True,
453
        )
454
        key.metadata["KeyState"] = KeyState.Disabled
1✔
455
        key.metadata["Enabled"] = False
1✔
456

457
    @handler("EnableKey", expand=False)
1✔
458
    def enable_key(self, context: RequestContext, request: EnableKeyRequest) -> None:
1✔
459
        key = self._get_kms_key(
1✔
460
            context.account_id,
461
            context.region,
462
            request.get("KeyId"),
463
            enabled_key_allowed=True,
464
            disabled_key_allowed=True,
465
        )
466
        key.metadata["KeyState"] = KeyState.Enabled
1✔
467
        key.metadata["Enabled"] = True
1✔
468

469
    @handler("ListKeys", expand=False)
1✔
470
    def list_keys(self, context: RequestContext, request: ListKeysRequest) -> ListKeysResponse:
1✔
471
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html#API_ListKeys_ResponseSyntax
472
        # Out of whole KeyMetadata only two fields are present in the response.
473
        keys_list = PaginatedList(
1✔
474
            [
475
                {"KeyId": key.metadata["KeyId"], "KeyArn": key.metadata["Arn"]}
476
                for key in self._get_store(context.account_id, context.region).keys.values()
477
            ]
478
        )
479
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html#API_ListKeys_RequestParameters
480
        # Regarding the default value of Limit: "If you do not include a value, it defaults to 100."
481
        page, next_token = keys_list.get_page(
1✔
482
            lambda key_data: key_data.get("KeyId"),
483
            next_token=request.get("Marker"),
484
            page_size=request.get("Limit", 100),
485
        )
486
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
487
        return ListKeysResponse(Keys=page, **kwargs)
1✔
488

489
    @handler("DescribeKey", expand=False)
1✔
490
    def describe_key(
1✔
491
        self, context: RequestContext, request: DescribeKeyRequest
492
    ) -> DescribeKeyResponse:
493
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
494
        key = self._get_kms_key(account_id, region_name, key_id, any_key_state_allowed=True)
1✔
495
        return DescribeKeyResponse(KeyMetadata=key.metadata)
1✔
496

497
    @handler("ReplicateKey", expand=False)
1✔
498
    def replicate_key(
1✔
499
        self, context: RequestContext, request: ReplicateKeyRequest
500
    ) -> ReplicateKeyResponse:
501
        account_id = context.account_id
1✔
502
        primary_key = self._get_kms_key(account_id, context.region, request.get("KeyId"))
1✔
503
        key_id = primary_key.metadata.get("KeyId")
1✔
504
        key_arn = primary_key.metadata.get("Arn")
1✔
505
        if not primary_key.metadata.get("MultiRegion"):
1✔
506
            raise UnsupportedOperationException(
×
507
                f"Unable to replicate a non-MultiRegion key {key_id}"
508
            )
509
        replica_region = request.get("ReplicaRegion")
1✔
510
        replicate_to_store = kms_stores[account_id][replica_region]
1✔
511

512
        if (
1✔
513
            primary_key.metadata.get("MultiRegionConfiguration", {}).get("MultiRegionKeyType")
514
            != MultiRegionKeyType.PRIMARY
515
        ):
516
            raise UnsupportedOperationException(f"{key_arn} is not a multi-region primary key.")
1✔
517

518
        if key_id in replicate_to_store.keys:
1✔
519
            raise AlreadyExistsException(
×
520
                f"Unable to replicate key {key_id} to region {replica_region}, as the key "
521
                f"already exist there"
522
            )
523
        replica_key = copy.deepcopy(primary_key)
1✔
524
        replica_key.replicate_metadata(request, account_id, replica_region)
1✔
525
        replicate_to_store.keys[key_id] = replica_key
1✔
526

527
        self.update_primary_key_with_replica_keys(primary_key, replica_key, replica_region)
1✔
528

529
        # CurrentKeyMaterialId is not returned in the ReplicaKeyMetadata. May be due to not being evaluated until
530
        # the key has been successfully replicated as it does not show up in DescribeKey immediately either.
531
        replica_key_metadata_response = copy.deepcopy(replica_key.metadata)
1✔
532
        replica_key_metadata_response.pop("CurrentKeyMaterialId", None)
1✔
533

534
        return ReplicateKeyResponse(ReplicaKeyMetadata=replica_key_metadata_response)
1✔
535

536
    @staticmethod
1✔
537
    # Adds new multi region replica key to the primary key's metadata.
538
    def update_primary_key_with_replica_keys(key: KmsKey, replica_key: KmsKey, region: str):
1✔
539
        key.metadata["MultiRegionConfiguration"]["ReplicaKeys"].append(
1✔
540
            MultiRegionKey(
541
                Arn=replica_key.metadata["Arn"],
542
                Region=region,
543
            )
544
        )
545

546
    @handler("UpdateKeyDescription", expand=False)
1✔
547
    def update_key_description(
1✔
548
        self, context: RequestContext, request: UpdateKeyDescriptionRequest
549
    ) -> None:
550
        key = self._get_kms_key(
1✔
551
            context.account_id,
552
            context.region,
553
            request.get("KeyId"),
554
            enabled_key_allowed=True,
555
            disabled_key_allowed=True,
556
        )
557
        key.metadata["Description"] = request.get("Description")
1✔
558

559
    @handler("CreateGrant", expand=False)
1✔
560
    def create_grant(
1✔
561
        self, context: RequestContext, request: CreateGrantRequest
562
    ) -> CreateGrantResponse:
563
        key_account_id, key_region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
564
        key = self._get_kms_key(key_account_id, key_region_name, key_id)
1✔
565

566
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
567
        # matter which type of id is used.
568
        key_id = key.metadata.get("KeyId")
1✔
569
        request["KeyId"] = key_id
1✔
570
        self._validate_grant_request(request)
1✔
571
        grant_name = request.get("Name")
1✔
572

573
        store = self._get_store(context.account_id, context.region)
1✔
574

575
        # Check for existing grant with the same name for idempotency
576
        grant: KmsGrant | None = None
1✔
577
        if grant_name:
1✔
578
            for existing_grant in store.grants.values():
1✔
579
                if existing_grant.metadata.get("Name") != grant_name:
1✔
580
                    continue
1✔
581
                if existing_grant.metadata.get("KeyId") == key_id:
1✔
UNCOV
582
                    grant = existing_grant
×
UNCOV
583
                    break
×
584

585
        if grant is None:
1✔
586
            grant = KmsGrant(request, context.account_id, context.region)
1✔
587
            grant_id = grant.metadata["GrantId"]
1✔
588
            store.grants[grant_id] = grant
1✔
589
            store.grant_tokens[grant.token] = grant_id
1✔
590

591
        # At the moment we do not support multiple GrantTokens for grant creation request. Instead, we always use
592
        # the same token. For the reference, AWS documentation says:
593
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateGrant.html#API_CreateGrant_RequestParameters
594
        # "The returned grant token is unique with every CreateGrant request, even when a duplicate GrantId is
595
        # returned". "A duplicate GrantId" refers to the idempotency of grant creation requests - if a request has
596
        # "Name" field, and if such name already belongs to a previously created grant, no new grant gets created
597
        # and the existing grant with the name is returned.
598
        return CreateGrantResponse(GrantId=grant.metadata["GrantId"], GrantToken=grant.token)
1✔
599

600
    @handler("ListGrants", expand=False)
1✔
601
    def list_grants(
1✔
602
        self, context: RequestContext, request: ListGrantsRequest
603
    ) -> ListGrantsResponse:
604
        if not request.get("KeyId"):
1✔
UNCOV
605
            raise ValidationError("Required input parameter KeyId not specified")
×
606
        key_account_id, key_region_name, _ = self._parse_key_id(request["KeyId"], context)
1✔
607
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
608
        # matter which type of id is used.
609
        key = self._get_kms_key(
1✔
610
            key_account_id, key_region_name, request.get("KeyId"), any_key_state_allowed=True
611
        )
612
        key_id = key.metadata.get("KeyId")
1✔
613

614
        store = self._get_store(context.account_id, context.region)
1✔
615
        grant_id = request.get("GrantId")
1✔
616
        if grant_id:
1✔
UNCOV
617
            if grant_id not in store.grants:
×
UNCOV
618
                raise InvalidGrantIdException()
×
UNCOV
619
            return ListGrantsResponse(Grants=[store.grants[grant_id].metadata])
×
620

621
        matching_grants = []
1✔
622
        grantee_principal = request.get("GranteePrincipal")
1✔
623
        for grant in store.grants.values():
1✔
624
            # KeyId is a mandatory field of ListGrants request, so is going to be present.
625
            _, _, grant_key_id = parse_key_arn(grant.metadata["KeyArn"])
1✔
626
            if grant_key_id != key_id:
1✔
627
                continue
1✔
628
            # GranteePrincipal is a mandatory field for CreateGrant, should be in grants. But it is an optional field
629
            # for ListGrants, so might not be there.
630
            if grantee_principal and grant.metadata["GranteePrincipal"] != grantee_principal:
1✔
UNCOV
631
                continue
×
632
            matching_grants.append(grant.metadata)
1✔
633

634
        grants_list = PaginatedList(matching_grants)
1✔
635
        page, next_token = grants_list.get_page(
1✔
636
            lambda grant_data: grant_data.get("GrantId"),
637
            next_token=request.get("Marker"),
638
            page_size=request.get("Limit", 50),
639
        )
640
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
641

642
        return ListGrantsResponse(Grants=page, **kwargs)
1✔
643

644
    @staticmethod
1✔
645
    def _delete_grant(store: KmsStore, grant_id: str, key_id: str):
1✔
646
        grant = store.grants[grant_id]
1✔
647

648
        _, _, grant_key_id = parse_key_arn(grant.metadata.get("KeyArn"))
1✔
649
        if key_id != grant_key_id:
1✔
UNCOV
650
            raise ValidationError(f"Invalid KeyId={key_id} specified for grant {grant_id}")
×
651

652
        store.grant_tokens.pop(grant.token)
1✔
653
        store.grants.pop(grant_id)
1✔
654

655
    def revoke_grant(
1✔
656
        self,
657
        context: RequestContext,
658
        key_id: KeyIdType,
659
        grant_id: GrantIdType,
660
        dry_run: NullableBooleanType = None,
661
        **kwargs,
662
    ) -> None:
663
        # TODO add support for "dry_run"
664
        key_account_id, key_region_name, key_id = self._parse_key_id(key_id, context)
1✔
665
        key = self._get_kms_key(key_account_id, key_region_name, key_id, any_key_state_allowed=True)
1✔
666
        key_id = key.metadata.get("KeyId")
1✔
667

668
        store = self._get_store(context.account_id, context.region)
1✔
669

670
        if grant_id not in store.grants:
1✔
UNCOV
671
            raise InvalidGrantIdException()
×
672

673
        self._delete_grant(store, grant_id, key_id)
1✔
674

675
    def retire_grant(
1✔
676
        self,
677
        context: RequestContext,
678
        grant_token: GrantTokenType = None,
679
        key_id: KeyIdType = None,
680
        grant_id: GrantIdType = None,
681
        dry_run: NullableBooleanType = None,
682
        **kwargs,
683
    ) -> None:
684
        # TODO add support for "dry_run"
685
        if not grant_token and (not grant_id or not key_id):
1✔
UNCOV
686
            raise ValidationException("Grant token OR (grant ID, key ID) must be specified")
×
687

688
        if grant_token:
1✔
689
            decoded_token = to_str(base64.b64decode(grant_token))
1✔
690
            grant_account_id, grant_region_name, _ = decoded_token.split(":")
1✔
691
            grant_store = self._get_store(grant_account_id, grant_region_name)
1✔
692

693
            if grant_token not in grant_store.grant_tokens:
1✔
UNCOV
694
                raise NotFoundException(f"Unable to find grant token {grant_token}")
×
695

696
            grant_id = grant_store.grant_tokens[grant_token]
1✔
697
        else:
698
            grant_store = self._get_store(context.account_id, context.region)
1✔
699

700
        if key_id:
1✔
701
            key_account_id, key_region_name, key_id = self._parse_key_id(key_id, context)
1✔
702
            key = self._get_kms_key(
1✔
703
                key_account_id, key_region_name, key_id, any_key_state_allowed=True
704
            )
705
            key_id = key.metadata.get("KeyId")
1✔
706
        else:
707
            _, _, key_id = parse_key_arn(grant_store.grants[grant_id].metadata.get("KeyArn"))
1✔
708

709
        self._delete_grant(grant_store, grant_id, key_id)
1✔
710

711
    def list_retirable_grants(
1✔
712
        self,
713
        context: RequestContext,
714
        retiring_principal: PrincipalIdType,
715
        limit: LimitType = None,
716
        marker: MarkerType = None,
717
        **kwargs,
718
    ) -> ListGrantsResponse:
719
        if not retiring_principal:
1✔
UNCOV
720
            raise ValidationError("Required input parameter 'RetiringPrincipal' not specified")
×
721

722
        matching_grants = [
1✔
723
            grant.metadata
724
            for grant in self._get_store(context.account_id, context.region).grants.values()
725
            if grant.metadata.get("RetiringPrincipal") == retiring_principal
726
        ]
727
        grants_list = PaginatedList(matching_grants)
1✔
728
        limit = limit or 50
1✔
729
        page, next_token = grants_list.get_page(
1✔
730
            lambda grant_data: grant_data.get("GrantId"),
731
            next_token=marker,
732
            page_size=limit,
733
        )
734
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
735

736
        return ListGrantsResponse(Grants=page, **kwargs)
1✔
737

738
    def get_public_key(
1✔
739
        self,
740
        context: RequestContext,
741
        key_id: KeyIdType,
742
        grant_tokens: GrantTokenList = None,
743
        **kwargs,
744
    ) -> GetPublicKeyResponse:
745
        # According to https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html, GetPublicKey is supposed
746
        # to fail for disabled keys. But it actually doesn't fail in AWS.
747
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
748
        key = self._get_kms_key(
1✔
749
            account_id,
750
            region_name,
751
            key_id,
752
            enabled_key_allowed=True,
753
            disabled_key_allowed=True,
754
        )
755
        attrs = [
1✔
756
            "KeySpec",
757
            "KeyUsage",
758
            "EncryptionAlgorithms",
759
            "SigningAlgorithms",
760
        ]
761
        result = select_attributes(key.metadata, attrs)
1✔
762
        result["PublicKey"] = key.crypto_key.public_key
1✔
763
        result["KeyId"] = key.metadata["Arn"]
1✔
764
        return GetPublicKeyResponse(**result)
1✔
765

766
    def _generate_data_key_pair(
1✔
767
        self,
768
        context: RequestContext,
769
        key_id: str,
770
        key_pair_spec: str,
771
        encryption_context: EncryptionContextType = None,
772
        dry_run: NullableBooleanType = None,
773
    ):
774
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
775
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
776
        self._validate_key_for_encryption_decryption(context, key)
1✔
777
        KmsCryptoKey.assert_valid(key_pair_spec)
1✔
778
        return execute_dry_run_capable(
1✔
779
            self._build_data_key_pair_response, dry_run, key, key_pair_spec, encryption_context
780
        )
781

782
    def _build_data_key_pair_response(
1✔
783
        self, key: KmsKey, key_pair_spec: str, encryption_context: EncryptionContextType = None
784
    ):
785
        crypto_key = KmsCryptoKey(key_pair_spec)
1✔
786

787
        return {
1✔
788
            "KeyId": key.metadata["Arn"],
789
            "KeyPairSpec": key_pair_spec,
790
            "PrivateKeyCiphertextBlob": key.encrypt(crypto_key.private_key, encryption_context),
791
            "PrivateKeyPlaintext": crypto_key.private_key,
792
            "PublicKey": crypto_key.public_key,
793
        }
794

795
    @handler("GenerateDataKeyPair")
1✔
796
    def generate_data_key_pair(
1✔
797
        self,
798
        context: RequestContext,
799
        key_id: KeyIdType,
800
        key_pair_spec: DataKeyPairSpec,
801
        encryption_context: EncryptionContextType = None,
802
        grant_tokens: GrantTokenList = None,
803
        recipient: RecipientInfo = None,
804
        dry_run: NullableBooleanType = None,
805
        **kwargs,
806
    ) -> GenerateDataKeyPairResponse:
807
        result = self._generate_data_key_pair(
1✔
808
            context, key_id, key_pair_spec, encryption_context, dry_run
809
        )
810
        return GenerateDataKeyPairResponse(**result)
1✔
811

812
    @handler("GenerateRandom", expand=False)
1✔
813
    def generate_random(
1✔
814
        self, context: RequestContext, request: GenerateRandomRequest
815
    ) -> GenerateRandomResponse:
816
        number_of_bytes = request.get("NumberOfBytes")
1✔
817
        if number_of_bytes is None:
1✔
818
            raise ValidationException("NumberOfBytes is required.")
1✔
819
        if number_of_bytes > 1024:
1✔
820
            raise ValidationException(
1✔
821
                f"1 validation error detected: Value '{number_of_bytes}' at 'numberOfBytes' failed "
822
                "to satisfy constraint: Member must have value less than or equal to 1024"
823
            )
824
        if number_of_bytes < 1:
1✔
825
            raise ValidationException(
1✔
826
                f"1 validation error detected: Value '{number_of_bytes}' at 'numberOfBytes' failed "
827
                "to satisfy constraint: Member must have value greater than or equal to 1"
828
            )
829

830
        byte_string = os.urandom(number_of_bytes)
1✔
831

832
        return GenerateRandomResponse(Plaintext=byte_string)
1✔
833

834
    @handler("GenerateDataKeyPairWithoutPlaintext")
1✔
835
    def generate_data_key_pair_without_plaintext(
1✔
836
        self,
837
        context: RequestContext,
838
        key_id: KeyIdType,
839
        key_pair_spec: DataKeyPairSpec,
840
        encryption_context: EncryptionContextType = None,
841
        grant_tokens: GrantTokenList = None,
842
        dry_run: NullableBooleanType = None,
843
        **kwargs,
844
    ) -> GenerateDataKeyPairWithoutPlaintextResponse:
845
        result = self._generate_data_key_pair(
1✔
846
            context, key_id, key_pair_spec, encryption_context, dry_run
847
        )
848
        result.pop("PrivateKeyPlaintext")
1✔
849
        return GenerateDataKeyPairResponse(**result)
1✔
850

851
    # We currently act on neither on KeySpec setting (which is different from and holds values different then
852
    # KeySpec for CreateKey) nor on NumberOfBytes. Instead, we generate a key with a key length that is "standard" in
853
    # LocalStack.
854
    #
855
    def _generate_data_key(
1✔
856
        self, context: RequestContext, key_id: str, encryption_context: EncryptionContextType = None
857
    ):
858
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
859
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
860
        # TODO Should also have a validation for the key being a symmetric one.
861
        self._validate_key_for_encryption_decryption(context, key)
1✔
862
        crypto_key = KmsCryptoKey("SYMMETRIC_DEFAULT")
1✔
863
        return {
1✔
864
            "KeyId": key.metadata["Arn"],
865
            "Plaintext": crypto_key.key_material,
866
            "CiphertextBlob": key.encrypt(crypto_key.key_material, encryption_context),
867
        }
868

869
    @handler("GenerateDataKey", expand=False)
1✔
870
    def generate_data_key(
1✔
871
        self, context: RequestContext, request: GenerateDataKeyRequest
872
    ) -> GenerateDataKeyResponse:
873
        result = self._generate_data_key(
1✔
874
            context, request.get("KeyId"), request.get("EncryptionContext")
875
        )
876
        return GenerateDataKeyResponse(**result)
1✔
877

878
    @handler("GenerateDataKeyWithoutPlaintext", expand=False)
1✔
879
    def generate_data_key_without_plaintext(
1✔
880
        self, context: RequestContext, request: GenerateDataKeyWithoutPlaintextRequest
881
    ) -> GenerateDataKeyWithoutPlaintextResponse:
882
        result = self._generate_data_key(
1✔
883
            context, request.get("KeyId"), request.get("EncryptionContext")
884
        )
885
        result.pop("Plaintext")
1✔
886
        return GenerateDataKeyWithoutPlaintextResponse(**result)
1✔
887

888
    @handler("GenerateMac", expand=False)
1✔
889
    def generate_mac(
1✔
890
        self,
891
        context: RequestContext,
892
        request: GenerateMacRequest,
893
    ) -> GenerateMacResponse:
894
        msg = request.get("Message")
1✔
895
        self._validate_mac_msg_length(msg)
1✔
896

897
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
898
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
899

900
        self._validate_key_for_generate_verify_mac(context, key)
1✔
901

902
        algorithm = request.get("MacAlgorithm")
1✔
903
        self._validate_mac_algorithm(key, algorithm)
1✔
904

905
        mac = key.generate_mac(msg, algorithm)
1✔
906

907
        return GenerateMacResponse(Mac=mac, MacAlgorithm=algorithm, KeyId=key.metadata.get("Arn"))
1✔
908

909
    @handler("VerifyMac", expand=False)
1✔
910
    def verify_mac(
1✔
911
        self,
912
        context: RequestContext,
913
        request: VerifyMacRequest,
914
    ) -> VerifyMacResponse:
915
        msg = request.get("Message")
1✔
916
        self._validate_mac_msg_length(msg)
1✔
917

918
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
919
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
920

921
        self._validate_key_for_generate_verify_mac(context, key)
1✔
922

923
        algorithm = request.get("MacAlgorithm")
1✔
924
        self._validate_mac_algorithm(key, algorithm)
1✔
925

926
        mac_valid = key.verify_mac(msg, request.get("Mac"), algorithm)
1✔
927

928
        return VerifyMacResponse(
1✔
929
            KeyId=key.metadata.get("Arn"), MacValid=mac_valid, MacAlgorithm=algorithm
930
        )
931

932
    @handler("Sign", expand=False)
1✔
933
    def sign(self, context: RequestContext, request: SignRequest) -> SignResponse:
1✔
934
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
935
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
936

937
        self._validate_key_for_sign_verify(context, key)
1✔
938

939
        # TODO Add constraints on KeySpec / SigningAlgorithm pairs:
940
        #  https://docs.aws.amazon.com/kms/latest/developerguide/asymmetric-key-specs.html#key-spec-ecc
941

942
        signing_algorithm = request.get("SigningAlgorithm")
1✔
943
        signature = key.sign(request.get("Message"), request.get("MessageType"), signing_algorithm)
1✔
944

945
        result = {
1✔
946
            "KeyId": key.metadata["Arn"],
947
            "Signature": signature,
948
            "SigningAlgorithm": signing_algorithm,
949
        }
950
        return SignResponse(**result)
1✔
951

952
    # Currently LocalStack only calculates SHA256 digests no matter what the signing algorithm is.
953
    @handler("Verify", expand=False)
1✔
954
    def verify(self, context: RequestContext, request: VerifyRequest) -> VerifyResponse:
1✔
955
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
956
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
957

958
        self._validate_key_for_sign_verify(context, key)
1✔
959

960
        signing_algorithm = request.get("SigningAlgorithm")
1✔
961
        is_signature_valid = key.verify(
1✔
962
            request.get("Message"),
963
            request.get("MessageType"),
964
            signing_algorithm,
965
            request.get("Signature"),
966
        )
967

968
        result = {
1✔
969
            "KeyId": key.metadata["Arn"],
970
            "SignatureValid": is_signature_valid,
971
            "SigningAlgorithm": signing_algorithm,
972
        }
973
        return VerifyResponse(**result)
1✔
974

975
    def re_encrypt(
1✔
976
        self,
977
        context: RequestContext,
978
        ciphertext_blob: CiphertextType,
979
        destination_key_id: KeyIdType,
980
        source_encryption_context: EncryptionContextType = None,
981
        source_key_id: KeyIdType = None,
982
        destination_encryption_context: EncryptionContextType = None,
983
        source_encryption_algorithm: EncryptionAlgorithmSpec = None,
984
        destination_encryption_algorithm: EncryptionAlgorithmSpec = None,
985
        grant_tokens: GrantTokenList = None,
986
        dry_run: NullableBooleanType = None,
987
        **kwargs,
988
    ) -> ReEncryptResponse:
989
        # TODO: when implementing, ensure cross-account support for source_key_id and destination_key_id
990
        # Parse and fetch source Key
991
        account_id, region_name, source_key_id = self._parse_key_id(source_key_id, context)
1✔
992
        source_key = self._get_kms_key(account_id, region_name, source_key_id)
1✔
993
        # Decrypt using source key
994
        decrypt_response = self.decrypt(
1✔
995
            context=context,
996
            ciphertext_blob=ciphertext_blob,
997
            encryption_context=source_encryption_context,
998
            encryption_algorithm=source_encryption_algorithm,
999
            key_id=source_key_id,
1000
            grant_tokens=grant_tokens,
1001
        )
1002
        # Parse and fetch destination key
1003
        account_id, region_name, destination_key_id = self._parse_key_id(
1✔
1004
            destination_key_id, context
1005
        )
1006
        destination_key = self._get_kms_key(account_id, region_name, destination_key_id)
1✔
1007
        # Encrypt using destination key
1008
        encrypt_response = self.encrypt(
1✔
1009
            context=context,
1010
            encryption_context=destination_encryption_context,
1011
            key_id=destination_key_id,
1012
            plaintext=decrypt_response["Plaintext"],
1013
            grant_tokens=grant_tokens,
1014
            dry_run=dry_run,
1015
        )
1016
        return ReEncryptResponse(
1✔
1017
            CiphertextBlob=encrypt_response["CiphertextBlob"],
1018
            SourceKeyId=source_key.metadata.get("Arn"),
1019
            KeyId=destination_key.metadata.get("Arn"),
1020
            SourceEncryptionAlgorithm=source_encryption_algorithm,
1021
            DestinationEncryptionAlgorithm=destination_encryption_algorithm,
1022
        )
1023

1024
    def encrypt(
1✔
1025
        self,
1026
        context: RequestContext,
1027
        key_id: KeyIdType,
1028
        plaintext: PlaintextType,
1029
        encryption_context: EncryptionContextType = None,
1030
        grant_tokens: GrantTokenList = None,
1031
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1032
        dry_run: NullableBooleanType = None,
1033
        **kwargs,
1034
    ) -> EncryptResponse:
1035
        # TODO add support for "dry_run"
1036
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
1037
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1038
        self._validate_plaintext_length(plaintext)
1✔
1039
        self._validate_plaintext_key_type_based(plaintext, key, encryption_algorithm)
1✔
1040
        self._validate_key_for_encryption_decryption(context, key)
1✔
1041
        self._validate_key_state_not_pending_import(key)
1✔
1042

1043
        ciphertext_blob = key.encrypt(plaintext, encryption_context)
1✔
1044
        # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always
1045
        # encrypts with symmetric encryption no matter the key settings.
1046
        return EncryptResponse(
1✔
1047
            CiphertextBlob=ciphertext_blob,
1048
            KeyId=key.metadata.get("Arn"),
1049
            EncryptionAlgorithm=encryption_algorithm,
1050
        )
1051

1052
    # TODO We currently do not even check encryption_context, while moto does. Should add the corresponding logic later.
1053
    def decrypt(
1✔
1054
        self,
1055
        context: RequestContext,
1056
        ciphertext_blob: CiphertextType,
1057
        encryption_context: EncryptionContextType = None,
1058
        grant_tokens: GrantTokenList = None,
1059
        key_id: KeyIdType = None,
1060
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1061
        recipient: RecipientInfo = None,
1062
        dry_run: NullableBooleanType = None,
1063
        **kwargs,
1064
    ) -> DecryptResponse:
1065
        # In AWS, key_id is only supplied for data encrypted with an asymmetrical algorithm. For symmetrical
1066
        # encryption, key_id is taken from the encrypted data itself.
1067
        # Since LocalStack doesn't currently do asymmetrical encryption, there is a question of modeling here: we
1068
        # currently expect data to be only encrypted with symmetric encryption, so having key_id inside. It might not
1069
        # always be what customers expect.
1070
        if key_id:
1✔
1071
            account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
1072
            try:
1✔
1073
                ciphertext = deserialize_ciphertext_blob(ciphertext_blob=ciphertext_blob)
1✔
1074
            except Exception as e:
1✔
1075
                logging.error("Error deserializing ciphertext blob: %s", e)
1✔
1076
                ciphertext = None
1✔
1077
                pass
1✔
1078
        else:
UNCOV
1079
            try:
×
UNCOV
1080
                ciphertext = deserialize_ciphertext_blob(ciphertext_blob=ciphertext_blob)
×
UNCOV
1081
                account_id, region_name, key_id = self._parse_key_id(ciphertext.key_id, context)
×
UNCOV
1082
            except Exception:
×
UNCOV
1083
                raise InvalidCiphertextException(
×
1084
                    "LocalStack is unable to deserialize the ciphertext blob. Perhaps the "
1085
                    "blob didn't come from LocalStack"
1086
                )
1087

1088
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1089
        if ciphertext and key.metadata["KeyId"] != ciphertext.key_id:
1✔
1090
            raise IncorrectKeyException(
1✔
1091
                "The key ID in the request does not identify a CMK that can perform this operation."
1092
            )
1093

1094
        self._validate_key_for_encryption_decryption(context, key)
1✔
1095
        self._validate_key_state_not_pending_import(key)
1✔
1096

1097
        # Handle the recipient field.  This is used by AWS Nitro to re-encrypt the plaintext to the key specified
1098
        # by the enclave.  Proper support for this will take significant work to figure out how to model enforcing
1099
        # the attestation measurements; for now, if recipient is specified and has an attestation doc in it including
1100
        # a public key where it's expected to be, we encrypt to that public key.  This at least allows users to use
1101
        # localstack as a drop-in replacement for AWS when testing without having to skip the secondary decryption
1102
        # when using localstack.
1103
        recipient_pubkey = None
1✔
1104
        if recipient:
1✔
1105
            attestation_document = recipient["AttestationDocument"]
1✔
1106
            # We do all of this in a try/catch and warn if it fails so that if users are currently passing a nonsense
1107
            # value we don't break it for them.  In the future we could do a breaking change to require a valid attestation
1108
            # (or at least one that contains the public key).
1109
            try:
1✔
1110
                recipient_pubkey = self._extract_attestation_pubkey(attestation_document)
1✔
1111
            except Exception as e:
1✔
1112
                logging.warning(
1✔
1113
                    "Unable to extract public key from non-empty attestation document: %s", e
1114
                )
1115

1116
        try:
1✔
1117
            # TODO: Extend the implementation to handle additional encryption/decryption scenarios
1118
            # beyond the current support for offline encryption and online decryption using RSA keys if key id exists in
1119
            # parameters, where `ciphertext_blob` will not be deserializable.
1120
            if self._is_rsa_spec(key.crypto_key.key_spec) and not ciphertext:
1✔
1121
                plaintext = key.decrypt_rsa(ciphertext_blob)
1✔
1122
            else:
1123
                # if symmetric encryption then ciphertext must not be None
1124
                if ciphertext is None:
1✔
1125
                    raise InvalidCiphertextException()
1✔
1126
                plaintext = key.decrypt(ciphertext, encryption_context)
1✔
1127
        except InvalidTag:
1✔
UNCOV
1128
            raise InvalidCiphertextException()
×
1129

1130
        # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always
1131
        # encrypts with symmetric encryption no matter the key settings.
1132
        #
1133
        # We return a key ARN instead of KeyId despite the name of the parameter, as this is what AWS does and states
1134
        # in its docs.
1135
        #  https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html#API_Decrypt_RequestSyntax
1136
        # TODO add support for "dry_run"
1137
        response = DecryptResponse(
1✔
1138
            KeyId=key.metadata.get("Arn"),
1139
            EncryptionAlgorithm=encryption_algorithm,
1140
        )
1141

1142
        # Encrypt to the recipient pubkey if specified.  Otherwise, return the actual plaintext
1143
        if recipient_pubkey:
1✔
1144
            response["CiphertextForRecipient"] = pkcs7_envelope_encrypt(plaintext, recipient_pubkey)
1✔
1145
        else:
1146
            response["Plaintext"] = plaintext
1✔
1147

1148
        return response
1✔
1149

1150
    def get_parameters_for_import(
1✔
1151
        self,
1152
        context: RequestContext,
1153
        key_id: KeyIdType,
1154
        wrapping_algorithm: AlgorithmSpec,
1155
        wrapping_key_spec: WrappingKeySpec,
1156
        **kwargs,
1157
    ) -> GetParametersForImportResponse:
1158
        store = self._get_store(context.account_id, context.region)
1✔
1159
        # KeyId can potentially hold one of multiple different types of key identifiers. get_key finds a key no
1160
        # matter which type of id is used.
1161
        key_to_import_material_to = self._get_kms_key(
1✔
1162
            context.account_id,
1163
            context.region,
1164
            key_id,
1165
            enabled_key_allowed=True,
1166
            disabled_key_allowed=True,
1167
        )
1168
        key_arn = key_to_import_material_to.metadata["Arn"]
1✔
1169
        key_origin = key_to_import_material_to.metadata.get("Origin")
1✔
1170

1171
        if key_origin != "EXTERNAL":
1✔
1172
            raise UnsupportedOperationException(
1✔
1173
                f"{key_arn} origin is {key_origin} which is not valid for this operation."
1174
            )
1175

1176
        key_id = key_to_import_material_to.metadata["KeyId"]
1✔
1177

1178
        key = KmsKey(CreateKeyRequest(KeySpec=wrapping_key_spec))
1✔
1179
        import_token = short_uid()
1✔
1180
        import_state = KeyImportState(
1✔
1181
            key_id=key_id, import_token=import_token, wrapping_algo=wrapping_algorithm, key=key
1182
        )
1183
        store.imports[import_token] = import_state
1✔
1184
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_GetParametersForImport.html
1185
        # "To import key material, you must use the public key and import token from the same response. These items
1186
        # are valid for 24 hours."
1187
        expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)
1✔
1188
        return GetParametersForImportResponse(
1✔
1189
            KeyId=key_to_import_material_to.metadata["Arn"],
1190
            ImportToken=to_bytes(import_state.import_token),
1191
            PublicKey=import_state.key.crypto_key.public_key,
1192
            ParametersValidTo=expiry_date,
1193
        )
1194

1195
    def import_key_material(
1✔
1196
        self,
1197
        context: RequestContext,
1198
        key_id: KeyIdType,
1199
        import_token: CiphertextType,
1200
        encrypted_key_material: CiphertextType,
1201
        valid_to: DateType | None = None,
1202
        expiration_model: ExpirationModelType | None = None,
1203
        import_type: ImportType | None = None,
1204
        key_material_description: KeyMaterialDescriptionType | None = None,
1205
        key_material_id: BackingKeyIdType | None = None,
1206
        **kwargs,
1207
    ) -> ImportKeyMaterialResponse:
1208
        store = self._get_store(context.account_id, context.region)
1✔
1209
        import_token = to_str(import_token)
1✔
1210
        import_state = store.imports.get(import_token)
1✔
1211
        if not import_state:
1✔
UNCOV
1212
            raise NotFoundException(f"Unable to find key import token '{import_token}'")
×
1213
        key_to_import_material_to = self._get_kms_key(
1✔
1214
            context.account_id,
1215
            context.region,
1216
            key_id,
1217
            enabled_key_allowed=True,
1218
            disabled_key_allowed=True,
1219
        )
1220

1221
        # TODO check if there was already a key imported for this kms key
1222
        # if so, it has to be identical. We cannot change keys by reimporting after deletion/expiry
1223
        key_material = self._decrypt_wrapped_key_material(import_state, encrypted_key_material)
1✔
1224
        key_material_id = key_to_import_material_to.generate_key_material_id(key_material)
1✔
1225
        key_to_import_material_to.metadata["ExpirationModel"] = (
1✔
1226
            expiration_model or ExpirationModelType.KEY_MATERIAL_EXPIRES
1227
        )
1228
        if (
1✔
1229
            key_to_import_material_to.metadata["ExpirationModel"]
1230
            == ExpirationModelType.KEY_MATERIAL_EXPIRES
1231
            and not valid_to
1232
        ):
1233
            raise ValidationException(
1✔
1234
                "A validTo date must be set if the ExpirationModel is KEY_MATERIAL_EXPIRES"
1235
            )
1236
        if existing_pending_material := key_to_import_material_to.crypto_key.pending_key_material:
1✔
1237
            pending_key_material_id = key_to_import_material_to.generate_key_material_id(
1✔
1238
                existing_pending_material
1239
            )
1240
            raise KMSInvalidStateException(
1✔
1241
                f"New key material (id: {key_material_id}) cannot be imported into KMS key "
1242
                f"{key_to_import_material_to.metadata['Arn']}, because another key material "
1243
                f"(id: {pending_key_material_id}) is pending rotation."
1244
            )
1245

1246
        # TODO actually set validTo and make the key expire
1247
        key_to_import_material_to.metadata["Enabled"] = True
1✔
1248
        key_to_import_material_to.metadata["KeyState"] = KeyState.Enabled
1✔
1249
        key_to_import_material_to.crypto_key.load_key_material(key_material)
1✔
1250

1251
        # KeyMaterialId / CurrentKeyMaterialId is only exposed for symmetric encryption keys.
1252
        key_material_id_response = None
1✔
1253
        if key_to_import_material_to.metadata["KeySpec"] == KeySpec.SYMMETRIC_DEFAULT:
1✔
1254
            key_material_id_response = key_to_import_material_to.generate_key_material_id(
1✔
1255
                key_material
1256
            )
1257

1258
            # If there is no CurrentKeyMaterialId, instantly promote the pending key material to the current.
1259
            if key_to_import_material_to.metadata.get("CurrentKeyMaterialId") is None:
1✔
1260
                key_to_import_material_to.metadata["CurrentKeyMaterialId"] = (
1✔
1261
                    key_material_id_response
1262
                )
1263
                key_to_import_material_to.crypto_key.key_material = (
1✔
1264
                    key_to_import_material_to.crypto_key.pending_key_material
1265
                )
1266
                key_to_import_material_to.crypto_key.pending_key_material = None
1✔
1267

1268
        return ImportKeyMaterialResponse(
1✔
1269
            KeyId=key_to_import_material_to.metadata["Arn"],
1270
            KeyMaterialId=key_material_id_response,
1271
        )
1272

1273
    def delete_imported_key_material(
1✔
1274
        self,
1275
        context: RequestContext,
1276
        key_id: KeyIdType,
1277
        key_material_id: BackingKeyIdType | None = None,
1278
        **kwargs,
1279
    ) -> DeleteImportedKeyMaterialResponse:
1280
        # TODO add support for key_material_id
1281
        key = self._get_kms_key(
1✔
1282
            context.account_id,
1283
            context.region,
1284
            key_id,
1285
            enabled_key_allowed=True,
1286
            disabled_key_allowed=True,
1287
        )
1288
        key.crypto_key.key_material = None
1✔
1289
        key.metadata["Enabled"] = False
1✔
1290
        key.metadata["KeyState"] = KeyState.PendingImport
1✔
1291
        key.metadata.pop("ExpirationModel", None)
1✔
1292

1293
        # TODO populate DeleteImportedKeyMaterialResponse
1294
        return DeleteImportedKeyMaterialResponse()
1✔
1295

1296
    @handler("CreateAlias", expand=False)
1✔
1297
    def create_alias(self, context: RequestContext, request: CreateAliasRequest) -> None:
1✔
1298
        store = self._get_store(context.account_id, context.region)
1✔
1299
        alias_name = request["AliasName"]
1✔
1300
        validate_alias_name(alias_name)
1✔
1301
        if alias_name in store.aliases:
1✔
UNCOV
1302
            alias_arn = store.aliases.get(alias_name).metadata["AliasArn"]
×
1303
            # AWS itself uses AliasArn instead of AliasName in this exception.
UNCOV
1304
            raise AlreadyExistsException(f"An alias with the name {alias_arn} already exists")
×
1305
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
1306
        # matter which type of id is used.
1307
        key = self._get_kms_key(
1✔
1308
            context.account_id,
1309
            context.region,
1310
            request.get("TargetKeyId"),
1311
            enabled_key_allowed=True,
1312
            disabled_key_allowed=True,
1313
        )
1314
        request["TargetKeyId"] = key.metadata.get("KeyId")
1✔
1315
        self._create_kms_alias(context.account_id, context.region, request)
1✔
1316

1317
    @handler("DeleteAlias", expand=False)
1✔
1318
    def delete_alias(self, context: RequestContext, request: DeleteAliasRequest) -> None:
1✔
1319
        # We do not check the state of the key, as, according to AWS docs, all key states, that are possible in
1320
        # LocalStack, are supported by this operation.
1321
        store = self._get_store(context.account_id, context.region)
1✔
1322
        alias_name = request["AliasName"]
1✔
1323
        if alias_name not in store.aliases:
1✔
1324
            alias_arn = kms_alias_arn(request["AliasName"], context.account_id, context.region)
1✔
1325
            # AWS itself uses AliasArn instead of AliasName in this exception.
1326
            raise NotFoundException(f"Alias {alias_arn} is not found")
1✔
1327
        store.aliases.pop(alias_name, None)
1✔
1328

1329
    @handler("UpdateAlias", expand=False)
1✔
1330
    def update_alias(self, context: RequestContext, request: UpdateAliasRequest) -> None:
1✔
1331
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1332
        # "If the source KMS key is pending deletion, the command succeeds. If the destination KMS key is pending
1333
        # deletion, the command fails with error: KMSInvalidStateException : <key ARN> is pending deletion."
1334
        # Also disabled keys are accepted for this operation (see the table on that page).
1335
        #
1336
        # As such, we do not care about the state of the source key, but check the destination one.
1337

1338
        alias_name = request["AliasName"]
1✔
1339
        # This API, per AWS docs, accepts only names, not ARNs.
1340
        validate_alias_name(alias_name)
1✔
1341
        alias = self._get_kms_alias(context.account_id, context.region, alias_name)
1✔
1342
        key_id = request["TargetKeyId"]
1✔
1343
        # Don't care about the key itself, just want to validate its state.
1344
        self._get_kms_key(
1✔
1345
            context.account_id,
1346
            context.region,
1347
            key_id,
1348
            enabled_key_allowed=True,
1349
            disabled_key_allowed=True,
1350
        )
1351
        alias.metadata["TargetKeyId"] = key_id
1✔
1352
        alias.update_date_of_last_update()
1✔
1353

1354
    @handler("ListAliases")
1✔
1355
    def list_aliases(
1✔
1356
        self,
1357
        context: RequestContext,
1358
        key_id: KeyIdType = None,
1359
        limit: LimitType = None,
1360
        marker: MarkerType = None,
1361
        **kwargs,
1362
    ) -> ListAliasesResponse:
1363
        store = self._get_store(context.account_id, context.region)
1✔
1364
        if key_id:
1✔
1365
            # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
1366
            # matter which type of id is used.
1367
            key = self._get_kms_key(
1✔
1368
                context.account_id, context.region, key_id, any_key_state_allowed=True
1369
            )
1370
            key_id = key.metadata.get("KeyId")
1✔
1371

1372
        matching_aliases = []
1✔
1373
        for alias in store.aliases.values():
1✔
1374
            if key_id and alias.metadata["TargetKeyId"] != key_id:
1✔
1375
                continue
1✔
1376
            matching_aliases.append(alias.metadata)
1✔
1377
        aliases_list = PaginatedList(matching_aliases)
1✔
1378
        limit = limit or 100
1✔
1379
        page, next_token = aliases_list.get_page(
1✔
1380
            lambda alias_metadata: alias_metadata.get("AliasName"),
1381
            next_token=marker,
1382
            page_size=limit,
1383
        )
1384
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
1385
        return ListAliasesResponse(Aliases=page, **kwargs)
1✔
1386

1387
    @handler("GetKeyRotationStatus", expand=False)
1✔
1388
    def get_key_rotation_status(
1✔
1389
        self, context: RequestContext, request: GetKeyRotationStatusRequest
1390
    ) -> GetKeyRotationStatusResponse:
1391
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1392
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1393
        # We do not model that here, though.
1394
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
1395
        key = self._get_kms_key(account_id, region_name, key_id, any_key_state_allowed=True)
1✔
1396

1397
        response = GetKeyRotationStatusResponse(
1✔
1398
            KeyId=key.metadata["Arn"],
1399
            KeyRotationEnabled=key.is_key_rotation_enabled,
1400
            NextRotationDate=key.next_rotation_date,
1401
        )
1402
        if key.is_key_rotation_enabled:
1✔
1403
            response["RotationPeriodInDays"] = key.rotation_period_in_days
1✔
1404

1405
        return response
1✔
1406

1407
    @handler("DisableKeyRotation", expand=False)
1✔
1408
    def disable_key_rotation(
1✔
1409
        self, context: RequestContext, request: DisableKeyRotationRequest
1410
    ) -> None:
1411
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1412
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1413
        # We do not model that here, though.
1414
        key = self._get_kms_key(context.account_id, context.region, request.get("KeyId"))
1✔
1415
        key.is_key_rotation_enabled = False
1✔
1416

1417
    @handler("EnableKeyRotation", expand=False)
1✔
1418
    def enable_key_rotation(
1✔
1419
        self, context: RequestContext, request: EnableKeyRotationRequest
1420
    ) -> None:
1421
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1422
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1423
        # We do not model that here, though.
1424
        key = self._get_kms_key(context.account_id, context.region, request.get("KeyId"))
1✔
1425
        key.is_key_rotation_enabled = True
1✔
1426
        if request.get("RotationPeriodInDays"):
1✔
1427
            key.rotation_period_in_days = request.get("RotationPeriodInDays")
1✔
1428
        key._update_key_rotation_date()
1✔
1429

1430
    @handler("ListKeyPolicies", expand=False)
1✔
1431
    def list_key_policies(
1✔
1432
        self, context: RequestContext, request: ListKeyPoliciesRequest
1433
    ) -> ListKeyPoliciesResponse:
1434
        # We just care if the key exists. The response, by AWS specifications, is the same for all keys, as the only
1435
        # supported policy is "default":
1436
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeyPolicies.html#API_ListKeyPolicies_ResponseElements
1437
        self._get_kms_key(
1✔
1438
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1439
        )
1440
        return ListKeyPoliciesResponse(PolicyNames=["default"], Truncated=False)
1✔
1441

1442
    @handler("PutKeyPolicy", expand=False)
1✔
1443
    def put_key_policy(self, context: RequestContext, request: PutKeyPolicyRequest) -> None:
1✔
1444
        key = self._get_kms_key(
1✔
1445
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1446
        )
1447
        if request.get("PolicyName") != "default":
1✔
UNCOV
1448
            raise UnsupportedOperationException("Only default policy is supported")
×
1449
        key.policy = request.get("Policy")
1✔
1450

1451
    @handler("GetKeyPolicy", expand=False)
1✔
1452
    def get_key_policy(
1✔
1453
        self, context: RequestContext, request: GetKeyPolicyRequest
1454
    ) -> GetKeyPolicyResponse:
1455
        key = self._get_kms_key(
1✔
1456
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1457
        )
1458
        if request.get("PolicyName") != "default":
1✔
UNCOV
1459
            raise NotFoundException("No such policy exists")
×
1460
        return GetKeyPolicyResponse(Policy=key.policy)
1✔
1461

1462
    @handler("ListResourceTags", expand=False)
1✔
1463
    def list_resource_tags(
1✔
1464
        self, context: RequestContext, request: ListResourceTagsRequest
1465
    ) -> ListResourceTagsResponse:
1466
        key = self._get_kms_key(
1✔
1467
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1468
        )
1469
        keys_list = PaginatedList(
1✔
1470
            [{"TagKey": tag_key, "TagValue": tag_value} for tag_key, tag_value in key.tags.items()]
1471
        )
1472
        page, next_token = keys_list.get_page(
1✔
1473
            lambda tag: tag.get("TagKey"),
1474
            next_token=request.get("Marker"),
1475
            page_size=request.get("Limit", 50),
1476
        )
1477
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
1478
        return ListResourceTagsResponse(Tags=page, **kwargs)
1✔
1479

1480
    @handler("RotateKeyOnDemand", expand=False)
1✔
1481
    # TODO: return the key rotations in the ListKeyRotations operation
1482
    def rotate_key_on_demand(
1✔
1483
        self, context: RequestContext, request: RotateKeyOnDemandRequest
1484
    ) -> RotateKeyOnDemandResponse:
1485
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
1486
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1487

1488
        if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT:
1✔
1489
            raise UnsupportedOperationException()
1✔
1490
        self._validate_key_state_not_pending_import(key)
1✔
1491
        self._validate_external_key_has_pending_material(key)
1✔
1492

1493
        key.rotate_key_on_demand()
1✔
1494

1495
        return RotateKeyOnDemandResponse(
1✔
1496
            KeyId=key.metadata["Arn"],
1497
        )
1498

1499
    @handler("TagResource", expand=False)
1✔
1500
    def tag_resource(self, context: RequestContext, request: TagResourceRequest) -> None:
1✔
1501
        key = self._get_kms_key(
1✔
1502
            context.account_id,
1503
            context.region,
1504
            request.get("KeyId"),
1505
            enabled_key_allowed=True,
1506
            disabled_key_allowed=True,
1507
        )
1508
        key.add_tags(request.get("Tags"))
1✔
1509

1510
    @handler("UntagResource", expand=False)
1✔
1511
    def untag_resource(self, context: RequestContext, request: UntagResourceRequest) -> None:
1✔
1512
        key = self._get_kms_key(
1✔
1513
            context.account_id,
1514
            context.region,
1515
            request.get("KeyId"),
1516
            enabled_key_allowed=True,
1517
            disabled_key_allowed=True,
1518
        )
1519
        if not request.get("TagKeys"):
1✔
UNCOV
1520
            return
×
1521
        for tag_key in request.get("TagKeys"):
1✔
1522
            # AWS doesn't seem to mind removal of a non-existent tag, so we do not raise any exception.
1523
            key.tags.pop(tag_key, None)
1✔
1524

1525
    def derive_shared_secret(
1✔
1526
        self,
1527
        context: RequestContext,
1528
        key_id: KeyIdType,
1529
        key_agreement_algorithm: KeyAgreementAlgorithmSpec,
1530
        public_key: PublicKeyType,
1531
        grant_tokens: GrantTokenList = None,
1532
        dry_run: NullableBooleanType = None,
1533
        recipient: RecipientInfo = None,
1534
        **kwargs,
1535
    ) -> DeriveSharedSecretResponse:
1536
        key = self._get_kms_key(
1✔
1537
            context.account_id,
1538
            context.region,
1539
            key_id,
1540
            enabled_key_allowed=True,
1541
            disabled_key_allowed=True,
1542
        )
1543
        key_usage = key.metadata.get("KeyUsage")
1✔
1544
        key_origin = key.metadata.get("Origin")
1✔
1545

1546
        if key_usage != KeyUsageType.KEY_AGREEMENT:
1✔
1547
            raise InvalidKeyUsageException(
1✔
1548
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1549
            )
1550

1551
        if key_agreement_algorithm != KeyAgreementAlgorithmSpec.ECDH:
1✔
1552
            raise ValidationException(
1✔
1553
                f"1 validation error detected: Value '{key_agreement_algorithm}' at 'keyAgreementAlgorithm' "
1554
                f"failed to satisfy constraint: Member must satisfy enum value set: [ECDH]"
1555
            )
1556

1557
        # TODO: Verify the actual error raised
1558
        if key_origin not in [OriginType.AWS_KMS, OriginType.EXTERNAL]:
1✔
UNCOV
1559
            raise ValueError(f"Key origin: {key_origin} is not valid for {context.operation.name}.")
×
1560

1561
        shared_secret = key.derive_shared_secret(public_key)
1✔
1562
        return DeriveSharedSecretResponse(
1✔
1563
            KeyId=key_id,
1564
            SharedSecret=shared_secret,
1565
            KeyAgreementAlgorithm=key_agreement_algorithm,
1566
            KeyOrigin=key_origin,
1567
        )
1568

1569
    def _validate_key_state_not_pending_import(self, key: KmsKey):
1✔
1570
        if key.metadata["KeyState"] == KeyState.PendingImport:
1✔
1571
            raise KMSInvalidStateException(f"{key.metadata['Arn']} is pending import.")
1✔
1572

1573
    def _validate_external_key_has_pending_material(self, key: KmsKey):
1✔
1574
        if key.metadata["Origin"] == "EXTERNAL" and key.crypto_key.pending_key_material is None:
1✔
1575
            raise KMSInvalidStateException(
1✔
1576
                f"No available key material pending rotation for the key: {key.metadata['Arn']}."
1577
            )
1578

1579
    def _validate_key_for_encryption_decryption(self, context: RequestContext, key: KmsKey):
1✔
1580
        key_usage = key.metadata["KeyUsage"]
1✔
1581
        if key_usage != "ENCRYPT_DECRYPT":
1✔
1582
            raise InvalidKeyUsageException(
1✔
1583
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1584
            )
1585

1586
    def _validate_key_for_sign_verify(self, context: RequestContext, key: KmsKey):
1✔
1587
        key_usage = key.metadata["KeyUsage"]
1✔
1588
        if key_usage != "SIGN_VERIFY":
1✔
1589
            raise InvalidKeyUsageException(
1✔
1590
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1591
            )
1592

1593
    def _validate_key_for_generate_verify_mac(self, context: RequestContext, key: KmsKey):
1✔
1594
        key_usage = key.metadata["KeyUsage"]
1✔
1595
        if key_usage != "GENERATE_VERIFY_MAC":
1✔
1596
            raise InvalidKeyUsageException(
1✔
1597
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1598
            )
1599

1600
    def _validate_mac_msg_length(self, msg: bytes):
1✔
1601
        if len(msg) > 4096:
1✔
UNCOV
1602
            raise ValidationException(
×
1603
                "1 validation error detected: Value at 'message' failed to satisfy constraint: "
1604
                "Member must have length less than or equal to 4096"
1605
            )
1606

1607
    def _validate_mac_algorithm(self, key: KmsKey, algorithm: str):
1✔
1608
        if not hasattr(MacAlgorithmSpec, algorithm):
1✔
1609
            raise ValidationException(
1✔
1610
                f"1 validation error detected: Value '{algorithm}' at 'macAlgorithm' "
1611
                f"failed to satisfy constraint: Member must satisfy enum value set: "
1612
                f"[HMAC_SHA_384, HMAC_SHA_256, HMAC_SHA_224, HMAC_SHA_512]"
1613
            )
1614

1615
        key_spec = key.metadata["KeySpec"]
1✔
1616
        if x := algorithm.split("_"):
1✔
1617
            if len(x) == 3 and x[0] + "_" + x[2] != key_spec:
1✔
1618
                raise InvalidKeyUsageException(
1✔
1619
                    f"Algorithm {algorithm} is incompatible with key spec {key_spec}."
1620
                )
1621

1622
    def _validate_plaintext_length(self, plaintext: bytes):
1✔
1623
        if len(plaintext) > 4096:
1✔
1624
            raise ValidationException(
1✔
1625
                "1 validation error detected: Value at 'plaintext' failed to satisfy constraint: "
1626
                "Member must have length less than or equal to 4096"
1627
            )
1628

1629
    def _validate_grant_request(self, data: dict):
1✔
1630
        if "KeyId" not in data or "GranteePrincipal" not in data or "Operations" not in data:
1✔
UNCOV
1631
            raise ValidationError("Grant ID, key ID and grantee principal must be specified")
×
1632

1633
        for operation in data["Operations"]:
1✔
1634
            if operation not in VALID_OPERATIONS:
1✔
UNCOV
1635
                raise ValidationError(
×
1636
                    f"Value {['Operations']} at 'operations' failed to satisfy constraint: Member must satisfy"
1637
                    f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"
1638
                )
1639

1640
    def _extract_attestation_pubkey(self, attestation_document: bytes) -> RSAPublicKey:
1✔
1641
        # The attestation document comes as a COSE (CBOR Object Signing and Encryption) object: the CBOR
1642
        # attestation is signed and then the attestation and signature are again CBOR-encoded.  For now
1643
        # we don't bother validating the signature, though in the future we could.
1644
        cose_document = cbor2_loads(attestation_document)
1✔
1645
        attestation = cbor2_loads(cose_document[2])
1✔
1646
        public_key_bytes = attestation["public_key"]
1✔
1647
        return load_der_public_key(public_key_bytes)
1✔
1648

1649
    def _decrypt_wrapped_key_material(
1✔
1650
        self,
1651
        import_state: KeyImportState,
1652
        encrypted_key_material: CiphertextType,
1653
    ) -> bytes:
1654
        algo = import_state.wrapping_algo
1✔
1655
        decrypt_key = import_state.key.crypto_key.key
1✔
1656

1657
        match algo:
1✔
1658
            case AlgorithmSpec.RSAES_PKCS1_V1_5:
1✔
UNCOV
1659
                padding_scheme = padding.PKCS1v15()
×
UNCOV
1660
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
×
1661
            case AlgorithmSpec.RSAES_OAEP_SHA_1:
1✔
1662
                padding_scheme = padding.OAEP(padding.MGF1(hashes.SHA1()), hashes.SHA1(), None)
1✔
1663
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
1✔
1664
            case AlgorithmSpec.RSAES_OAEP_SHA_256:
1✔
1665
                padding_scheme = padding.OAEP(padding.MGF1(hashes.SHA256()), hashes.SHA256(), None)
1✔
1666
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
1✔
1667
            case AlgorithmSpec.RSA_AES_KEY_WRAP_SHA_256:
1✔
1668
                rsa_key_size_bytes = decrypt_key.key_size // 8
1✔
1669
                wrapped_aes_key = encrypted_key_material[:rsa_key_size_bytes]
1✔
1670
                wrapped_key_material = encrypted_key_material[rsa_key_size_bytes:]
1✔
1671

1672
                aes_key = decrypt_key.decrypt(
1✔
1673
                    wrapped_aes_key,
1674
                    padding.OAEP(
1675
                        mgf=padding.MGF1(algorithm=hashes.SHA256()),
1676
                        algorithm=hashes.SHA256(),
1677
                        label=None,
1678
                    ),
1679
                )
1680

1681
                return keywrap.aes_key_unwrap_with_padding(
1✔
1682
                    aes_key, wrapped_key_material, default_backend()
1683
                )
1684

UNCOV
1685
            case _:
×
UNCOV
1686
                raise KMSInvalidStateException(
×
1687
                    f"Unsupported padding, requested wrapping algorithm: '{algo}'"
1688
                )
1689

1690
    def _validate_plaintext_key_type_based(
1✔
1691
        self,
1692
        plaintext: PlaintextType,
1693
        key: KmsKey,
1694
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1695
    ):
1696
        # max size values extracted from AWS boto3 documentation
1697
        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kms/client/encrypt.html
1698
        max_size_bytes = 4096  # max allowed size
1✔
1699
        if (
1✔
1700
            key.metadata["KeySpec"] == KeySpec.RSA_2048
1701
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1702
        ):
1703
            max_size_bytes = 214
1✔
1704
        elif (
1✔
1705
            key.metadata["KeySpec"] == KeySpec.RSA_2048
1706
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1707
        ):
1708
            max_size_bytes = 190
1✔
1709
        elif (
1✔
1710
            key.metadata["KeySpec"] == KeySpec.RSA_3072
1711
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1712
        ):
1713
            max_size_bytes = 342
1✔
1714
        elif (
1✔
1715
            key.metadata["KeySpec"] == KeySpec.RSA_3072
1716
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1717
        ):
1718
            max_size_bytes = 318
1✔
1719
        elif (
1✔
1720
            key.metadata["KeySpec"] == KeySpec.RSA_4096
1721
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1722
        ):
1723
            max_size_bytes = 470
1✔
1724
        elif (
1✔
1725
            key.metadata["KeySpec"] == KeySpec.RSA_4096
1726
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1727
        ):
1728
            max_size_bytes = 446
1✔
1729

1730
        if len(plaintext) > max_size_bytes:
1✔
1731
            raise ValidationException(
1✔
1732
                f"Algorithm {encryption_algorithm} and key spec {key.metadata['KeySpec']} cannot encrypt data larger than {max_size_bytes} bytes."
1733
            )
1734

1735

1736
# ---------------
1737
# UTIL FUNCTIONS
1738
# ---------------
1739

1740
# Different AWS services have some internal integrations with KMS. Some create keys, that are used to encrypt/decrypt
1741
# customer's data. Such keys can't be created from outside for security reasons. So AWS services use some internal
1742
# APIs to do that. Functions here are supposed to be used by other LocalStack services to have similar integrations
1743
# with KMS in LocalStack. As such, they are supposed to be proper APIs (as in error and security handling),
1744
# just with more features.
1745

1746

1747
def set_key_managed(key_id: str, account_id: str, region_name: str) -> None:
1✔
1748
    key = KmsProvider._get_kms_key(account_id, region_name, key_id)
1✔
1749
    key.metadata["KeyManager"] = "AWS"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc