• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19844934392

01 Dec 2025 07:55PM UTC coverage: 86.945% (+0.1%) from 86.821%
19844934392

push

github

web-flow
Update ASF APIs, provider signatures, disable lambda patches (#13444)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>

69707 of 80174 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.4
/localstack-core/localstack/services/kms/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import logging
1✔
5
import os
1✔
6

7
from cbor2 import loads as cbor2_loads
1✔
8
from cryptography.exceptions import InvalidTag
1✔
9
from cryptography.hazmat.backends import default_backend
1✔
10
from cryptography.hazmat.primitives import hashes, keywrap
1✔
11
from cryptography.hazmat.primitives.asymmetric import padding
1✔
12
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
1✔
13
from cryptography.hazmat.primitives.serialization import load_der_public_key
1✔
14

15
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
16
from localstack.aws.api.kms import (
1✔
17
    AlgorithmSpec,
18
    AlreadyExistsException,
19
    BackingKeyIdType,
20
    CancelKeyDeletionRequest,
21
    CancelKeyDeletionResponse,
22
    CiphertextType,
23
    CreateAliasRequest,
24
    CreateGrantRequest,
25
    CreateGrantResponse,
26
    CreateKeyRequest,
27
    CreateKeyResponse,
28
    DataKeyPairSpec,
29
    DateType,
30
    DecryptResponse,
31
    DeleteAliasRequest,
32
    DeleteImportedKeyMaterialResponse,
33
    DeriveSharedSecretResponse,
34
    DescribeKeyRequest,
35
    DescribeKeyResponse,
36
    DisabledException,
37
    DisableKeyRequest,
38
    DisableKeyRotationRequest,
39
    EnableKeyRequest,
40
    EnableKeyRotationRequest,
41
    EncryptionAlgorithmSpec,
42
    EncryptionContextType,
43
    EncryptResponse,
44
    ExpirationModelType,
45
    GenerateDataKeyPairResponse,
46
    GenerateDataKeyPairWithoutPlaintextResponse,
47
    GenerateDataKeyRequest,
48
    GenerateDataKeyResponse,
49
    GenerateDataKeyWithoutPlaintextRequest,
50
    GenerateDataKeyWithoutPlaintextResponse,
51
    GenerateMacRequest,
52
    GenerateMacResponse,
53
    GenerateRandomRequest,
54
    GenerateRandomResponse,
55
    GetKeyPolicyRequest,
56
    GetKeyPolicyResponse,
57
    GetKeyRotationStatusRequest,
58
    GetKeyRotationStatusResponse,
59
    GetParametersForImportResponse,
60
    GetPublicKeyResponse,
61
    GrantIdType,
62
    GrantTokenList,
63
    GrantTokenType,
64
    ImportKeyMaterialResponse,
65
    ImportType,
66
    IncorrectKeyException,
67
    InvalidCiphertextException,
68
    InvalidGrantIdException,
69
    InvalidKeyUsageException,
70
    KeyAgreementAlgorithmSpec,
71
    KeyIdType,
72
    KeyMaterialDescriptionType,
73
    KeySpec,
74
    KeyState,
75
    KeyUsageType,
76
    KmsApi,
77
    KMSInvalidStateException,
78
    LimitType,
79
    ListAliasesResponse,
80
    ListGrantsRequest,
81
    ListGrantsResponse,
82
    ListKeyPoliciesRequest,
83
    ListKeyPoliciesResponse,
84
    ListKeysRequest,
85
    ListKeysResponse,
86
    ListResourceTagsRequest,
87
    ListResourceTagsResponse,
88
    MacAlgorithmSpec,
89
    MarkerType,
90
    MultiRegionKey,
91
    MultiRegionKeyType,
92
    NotFoundException,
93
    NullableBooleanType,
94
    OriginType,
95
    PlaintextType,
96
    PrincipalIdType,
97
    PublicKeyType,
98
    PutKeyPolicyRequest,
99
    RecipientInfo,
100
    ReEncryptResponse,
101
    ReplicateKeyRequest,
102
    ReplicateKeyResponse,
103
    RotateKeyOnDemandRequest,
104
    RotateKeyOnDemandResponse,
105
    ScheduleKeyDeletionRequest,
106
    ScheduleKeyDeletionResponse,
107
    SignRequest,
108
    SignResponse,
109
    TagResourceRequest,
110
    UnsupportedOperationException,
111
    UntagResourceRequest,
112
    UpdateAliasRequest,
113
    UpdateKeyDescriptionRequest,
114
    VerifyMacRequest,
115
    VerifyMacResponse,
116
    VerifyRequest,
117
    VerifyResponse,
118
    WrappingKeySpec,
119
)
120
from localstack.services.kms.exceptions import ValidationException
1✔
121
from localstack.services.kms.models import (
1✔
122
    MULTI_REGION_PATTERN,
123
    PATTERN_UUID,
124
    RESERVED_ALIASES,
125
    KeyImportState,
126
    KmsAlias,
127
    KmsCryptoKey,
128
    KmsGrant,
129
    KmsKey,
130
    KmsStore,
131
    deserialize_ciphertext_blob,
132
    kms_stores,
133
)
134
from localstack.services.kms.utils import (
1✔
135
    execute_dry_run_capable,
136
    is_valid_key_arn,
137
    parse_key_arn,
138
    validate_alias_name,
139
)
140
from localstack.services.plugins import ServiceLifecycleHook
1✔
141
from localstack.state import StateVisitor
1✔
142
from localstack.utils.aws.arns import get_partition, kms_alias_arn, parse_arn
1✔
143
from localstack.utils.collections import PaginatedList
1✔
144
from localstack.utils.common import select_attributes
1✔
145
from localstack.utils.crypto import pkcs7_envelope_encrypt
1✔
146
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
147

148
LOG = logging.getLogger(__name__)
1✔
149

150
# valid operations
151
VALID_OPERATIONS = [
1✔
152
    "CreateKey",
153
    "Decrypt",
154
    "Encrypt",
155
    "GenerateDataKey",
156
    "GenerateDataKeyWithoutPlaintext",
157
    "ReEncryptFrom",
158
    "ReEncryptTo",
159
    "Sign",
160
    "Verify",
161
    "GetPublicKey",
162
    "CreateGrant",
163
    "RetireGrant",
164
    "DescribeKey",
165
    "GenerateDataKeyPair",
166
    "GenerateDataKeyPairWithoutPlaintext",
167
]
168

169

170
class ValidationError(CommonServiceException):
1✔
171
    """General validation error type (defined in the AWS docs, but not part of the botocore spec)"""
172

173
    def __init__(self, message=None):
1✔
174
        super().__init__("ValidationError", message=message)
×
175

176

177
# For all operations constraints for states of keys are based on
178
# https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
179
class KmsProvider(KmsApi, ServiceLifecycleHook):
1✔
180
    """
181
    The LocalStack Key Management Service (KMS) provider.
182

183
    Cross-account access is supported by following operations where key ID belonging
184
    to another account can be used with the key ARN.
185
    - CreateGrant
186
    - DescribeKey
187
    - GetKeyRotationStatus
188
    - GetPublicKey
189
    - ListGrants
190
    - RetireGrant
191
    - RevokeGrant
192
    - Decrypt
193
    - Encrypt
194
    - GenerateDataKey
195
    - GenerateDataKeyPair
196
    - GenerateDataKeyPairWithoutPlaintext
197
    - GenerateDataKeyWithoutPlaintext
198
    - GenerateMac
199
    - ReEncrypt
200
    - Sign
201
    - Verify
202
    - VerifyMac
203
    """
204

205
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
206
        visitor.visit(kms_stores)
×
207

208
    #
209
    # Helpers
210
    #
211

212
    @staticmethod
1✔
213
    def _get_store(account_id: str, region_name: str) -> KmsStore:
1✔
214
        return kms_stores[account_id][region_name]
1✔
215

216
    @staticmethod
1✔
217
    def _create_kms_alias(account_id: str, region_name: str, request: CreateAliasRequest):
1✔
218
        store = kms_stores[account_id][region_name]
1✔
219
        alias = KmsAlias(request, account_id, region_name)
1✔
220
        alias_name = request.get("AliasName")
1✔
221
        store.aliases[alias_name] = alias
1✔
222

223
    @staticmethod
1✔
224
    def _create_kms_key(
1✔
225
        account_id: str, region_name: str, request: CreateKeyRequest = None
226
    ) -> KmsKey:
227
        store = kms_stores[account_id][region_name]
1✔
228
        key = KmsKey(request, account_id, region_name)
1✔
229
        key_id = key.metadata["KeyId"]
1✔
230
        store.keys[key_id] = key
1✔
231
        return key
1✔
232

233
    @staticmethod
1✔
234
    def _get_key_id_from_any_id(account_id: str, region_name: str, some_id: str) -> str:
1✔
235
        """
236
        Resolve a KMS key ID by using one of the following identifiers:
237
        - key ID
238
        - key ARN
239
        - key alias
240
        - key alias ARN
241
        """
242
        alias_name = None
1✔
243
        key_id = None
1✔
244
        key_arn = None
1✔
245

246
        if some_id.startswith("arn:"):
1✔
247
            if ":alias/" in some_id:
1✔
248
                alias_arn = some_id
×
249
                alias_name = "alias/" + alias_arn.split(":alias/")[1]
×
250
            elif ":key/" in some_id:
1✔
251
                key_arn = some_id
1✔
252
                key_id = key_arn.split(":key/")[1]
1✔
253
                parsed_arn = parse_arn(key_arn)
1✔
254
                if parsed_arn["region"] != region_name:
1✔
255
                    raise NotFoundException(f"Invalid arn {parsed_arn['region']}")
×
256
            else:
257
                raise ValueError(
×
258
                    f"Supplied value of {some_id} is an ARN, but neither of a KMS key nor of a KMS key "
259
                    f"alias"
260
                )
261
        elif some_id.startswith("alias/"):
1✔
262
            alias_name = some_id
1✔
263
        else:
264
            key_id = some_id
1✔
265

266
        store = kms_stores[account_id][region_name]
1✔
267

268
        if alias_name:
1✔
269
            KmsProvider._create_alias_if_reserved_and_not_exists(
1✔
270
                account_id,
271
                region_name,
272
                alias_name,
273
            )
274
            if alias_name not in store.aliases:
1✔
275
                raise NotFoundException(f"Unable to find KMS alias with name {alias_name}")
×
276
            key_id = store.aliases[alias_name].metadata["TargetKeyId"]
1✔
277

278
        # regular KeyId are UUID, and MultiRegion keys starts with 'mrk-' and 32 hex chars
279
        if not PATTERN_UUID.match(key_id) and not MULTI_REGION_PATTERN.match(key_id):
1✔
280
            raise NotFoundException(f"Invalid keyId '{key_id}'")
1✔
281

282
        if key_id not in store.keys:
1✔
283
            if not key_arn:
1✔
284
                key_arn = (
1✔
285
                    f"arn:{get_partition(region_name)}:kms:{region_name}:{account_id}:key/{key_id}"
286
                )
287
            raise NotFoundException(f"Key '{key_arn}' does not exist")
1✔
288

289
        return key_id
1✔
290

291
    @staticmethod
1✔
292
    def _create_alias_if_reserved_and_not_exists(
1✔
293
        account_id: str, region_name: str, alias_name: str
294
    ):
295
        store = kms_stores[account_id][region_name]
1✔
296
        if alias_name not in RESERVED_ALIASES or alias_name in store.aliases:
1✔
297
            return
1✔
298
        create_key_request = {}
×
299
        key_id = KmsProvider._create_kms_key(
×
300
            account_id,
301
            region_name,
302
            create_key_request,
303
        ).metadata.get("KeyId")
304
        create_alias_request = CreateAliasRequest(AliasName=alias_name, TargetKeyId=key_id)
×
305
        KmsProvider._create_kms_alias(account_id, region_name, create_alias_request)
×
306

307
    # While in AWS keys have more than Enabled, Disabled and PendingDeletion states, we currently only model these 3
308
    # in LocalStack, so this function is limited to them.
309
    #
310
    # The current default values are based on most of the operations working in AWS with enabled keys, but failing with
311
    # disabled and those pending deletion.
312
    #
313
    # If we decide to use the other states as well, we might want to come up with a better key state validation per
314
    # operation. Can consult this page for what states are supported by various operations:
315
    # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
316
    @staticmethod
1✔
317
    def _get_kms_key(
1✔
318
        account_id: str,
319
        region_name: str,
320
        any_type_of_key_id: str,
321
        any_key_state_allowed: bool = False,
322
        enabled_key_allowed: bool = True,
323
        disabled_key_allowed: bool = False,
324
        pending_deletion_key_allowed: bool = False,
325
    ) -> KmsKey:
326
        store = kms_stores[account_id][region_name]
1✔
327

328
        if any_key_state_allowed:
1✔
329
            enabled_key_allowed = True
1✔
330
            disabled_key_allowed = True
1✔
331
            pending_deletion_key_allowed = True
1✔
332
        if not (enabled_key_allowed or disabled_key_allowed or pending_deletion_key_allowed):
1✔
333
            raise ValueError("A key is requested, but all possible key states are prohibited")
×
334

335
        key_id = KmsProvider._get_key_id_from_any_id(account_id, region_name, any_type_of_key_id)
1✔
336
        key = store.keys[key_id]
1✔
337

338
        if not disabled_key_allowed and key.metadata.get("KeyState") == "Disabled":
1✔
339
            raise DisabledException(f"{key.metadata.get('Arn')} is disabled.")
1✔
340
        if not pending_deletion_key_allowed and key.metadata.get("KeyState") == "PendingDeletion":
1✔
341
            raise KMSInvalidStateException(f"{key.metadata.get('Arn')} is pending deletion.")
1✔
342
        if not enabled_key_allowed and key.metadata.get("KeyState") == "Enabled":
1✔
343
            raise KMSInvalidStateException(
×
344
                f"{key.metadata.get('Arn')} is enabled, but the operation doesn't support "
345
                f"such a state"
346
            )
347
        return store.keys[key_id]
1✔
348

349
    @staticmethod
1✔
350
    def _get_kms_alias(account_id: str, region_name: str, alias_name_or_arn: str) -> KmsAlias:
1✔
351
        store = kms_stores[account_id][region_name]
1✔
352

353
        if not alias_name_or_arn.startswith("arn:"):
1✔
354
            alias_name = alias_name_or_arn
1✔
355
        else:
356
            if ":alias/" not in alias_name_or_arn:
×
357
                raise ValidationException(f"{alias_name_or_arn} is not a valid alias ARN")
×
358
            alias_name = "alias/" + alias_name_or_arn.split(":alias/")[1]
×
359

360
        validate_alias_name(alias_name)
1✔
361

362
        if alias_name not in store.aliases:
1✔
363
            alias_arn = kms_alias_arn(alias_name, account_id, region_name)
×
364
            # AWS itself uses AliasArn instead of AliasName in this exception.
365
            raise NotFoundException(f"Alias {alias_arn} is not found.")
×
366

367
        return store.aliases.get(alias_name)
1✔
368

369
    @staticmethod
1✔
370
    def _parse_key_id(key_id_or_arn: str, context: RequestContext) -> tuple[str, str, str]:
1✔
371
        """
372
        Return locator attributes (account ID, region_name, key ID) of a given KMS key.
373

374
        If an ARN is provided, this is extracted from it. Otherwise, context data is used.
375

376
        :param key_id_or_arn: KMS key ID or ARN
377
        :param context: request context
378
        :return: Tuple of account ID, region name and key ID
379
        """
380
        if is_valid_key_arn(key_id_or_arn):
1✔
381
            account_id, region_name, key_id = parse_key_arn(key_id_or_arn)
1✔
382
            if region_name != context.region:
1✔
383
                raise NotFoundException(f"Invalid arn {region_name}")
1✔
384
            return account_id, region_name, key_id
1✔
385

386
        return context.account_id, context.region, key_id_or_arn
1✔
387

388
    @staticmethod
1✔
389
    def _is_rsa_spec(key_spec: str) -> bool:
1✔
390
        return key_spec in [KeySpec.RSA_2048, KeySpec.RSA_3072, KeySpec.RSA_4096]
1✔
391

392
    #
393
    # Operation Handlers
394
    #
395

396
    @handler("CreateKey", expand=False)
1✔
397
    def create_key(
1✔
398
        self,
399
        context: RequestContext,
400
        request: CreateKeyRequest = None,
401
    ) -> CreateKeyResponse:
402
        key = self._create_kms_key(context.account_id, context.region, request)
1✔
403
        return CreateKeyResponse(KeyMetadata=key.metadata)
1✔
404

405
    @handler("ScheduleKeyDeletion", expand=False)
1✔
406
    def schedule_key_deletion(
1✔
407
        self, context: RequestContext, request: ScheduleKeyDeletionRequest
408
    ) -> ScheduleKeyDeletionResponse:
409
        pending_window = int(request.get("PendingWindowInDays", 30))
1✔
410
        if pending_window < 7 or pending_window > 30:
1✔
411
            raise ValidationException(
×
412
                f"PendingWindowInDays should be between 7 and 30, but it is {pending_window}"
413
            )
414
        key = self._get_kms_key(
1✔
415
            context.account_id,
416
            context.region,
417
            request.get("KeyId"),
418
            enabled_key_allowed=True,
419
            disabled_key_allowed=True,
420
        )
421
        key.schedule_key_deletion(pending_window)
1✔
422
        attrs = ["DeletionDate", "KeyId", "KeyState"]
1✔
423
        result = select_attributes(key.metadata, attrs)
1✔
424
        result["PendingWindowInDays"] = pending_window
1✔
425
        return ScheduleKeyDeletionResponse(**result)
1✔
426

427
    @handler("CancelKeyDeletion", expand=False)
1✔
428
    def cancel_key_deletion(
1✔
429
        self, context: RequestContext, request: CancelKeyDeletionRequest
430
    ) -> CancelKeyDeletionResponse:
431
        key = self._get_kms_key(
1✔
432
            context.account_id,
433
            context.region,
434
            request.get("KeyId"),
435
            enabled_key_allowed=False,
436
            pending_deletion_key_allowed=True,
437
        )
438
        key.metadata["KeyState"] = KeyState.Disabled
1✔
439
        key.metadata["DeletionDate"] = None
1✔
440
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CancelKeyDeletion.html#API_CancelKeyDeletion_ResponseElements
441
        # "The Amazon Resource Name (key ARN) of the KMS key whose deletion is canceled."
442
        return CancelKeyDeletionResponse(KeyId=key.metadata.get("Arn"))
1✔
443

444
    @handler("DisableKey", expand=False)
1✔
445
    def disable_key(self, context: RequestContext, request: DisableKeyRequest) -> None:
1✔
446
        # Technically, AWS allows DisableKey for keys that are already disabled.
447
        key = self._get_kms_key(
1✔
448
            context.account_id,
449
            context.region,
450
            request.get("KeyId"),
451
            enabled_key_allowed=True,
452
            disabled_key_allowed=True,
453
        )
454
        key.metadata["KeyState"] = KeyState.Disabled
1✔
455
        key.metadata["Enabled"] = False
1✔
456

457
    @handler("EnableKey", expand=False)
1✔
458
    def enable_key(self, context: RequestContext, request: EnableKeyRequest) -> None:
1✔
459
        key = self._get_kms_key(
1✔
460
            context.account_id,
461
            context.region,
462
            request.get("KeyId"),
463
            enabled_key_allowed=True,
464
            disabled_key_allowed=True,
465
        )
466
        key.metadata["KeyState"] = KeyState.Enabled
1✔
467
        key.metadata["Enabled"] = True
1✔
468

469
    @handler("ListKeys", expand=False)
1✔
470
    def list_keys(self, context: RequestContext, request: ListKeysRequest) -> ListKeysResponse:
1✔
471
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html#API_ListKeys_ResponseSyntax
472
        # Out of whole KeyMetadata only two fields are present in the response.
473
        keys_list = PaginatedList(
1✔
474
            [
475
                {"KeyId": key.metadata["KeyId"], "KeyArn": key.metadata["Arn"]}
476
                for key in self._get_store(context.account_id, context.region).keys.values()
477
            ]
478
        )
479
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html#API_ListKeys_RequestParameters
480
        # Regarding the default value of Limit: "If you do not include a value, it defaults to 100."
481
        page, next_token = keys_list.get_page(
1✔
482
            lambda key_data: key_data.get("KeyId"),
483
            next_token=request.get("Marker"),
484
            page_size=request.get("Limit", 100),
485
        )
486
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
487
        return ListKeysResponse(Keys=page, **kwargs)
1✔
488

489
    @handler("DescribeKey", expand=False)
1✔
490
    def describe_key(
1✔
491
        self, context: RequestContext, request: DescribeKeyRequest
492
    ) -> DescribeKeyResponse:
493
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
494
        key = self._get_kms_key(account_id, region_name, key_id, any_key_state_allowed=True)
1✔
495
        return DescribeKeyResponse(KeyMetadata=key.metadata)
1✔
496

497
    @handler("ReplicateKey", expand=False)
1✔
498
    def replicate_key(
1✔
499
        self, context: RequestContext, request: ReplicateKeyRequest
500
    ) -> ReplicateKeyResponse:
501
        account_id = context.account_id
1✔
502
        primary_key = self._get_kms_key(account_id, context.region, request.get("KeyId"))
1✔
503
        key_id = primary_key.metadata.get("KeyId")
1✔
504
        key_arn = primary_key.metadata.get("Arn")
1✔
505
        if not primary_key.metadata.get("MultiRegion"):
1✔
506
            raise UnsupportedOperationException(
×
507
                f"Unable to replicate a non-MultiRegion key {key_id}"
508
            )
509
        replica_region = request.get("ReplicaRegion")
1✔
510
        replicate_to_store = kms_stores[account_id][replica_region]
1✔
511

512
        if (
1✔
513
            primary_key.metadata.get("MultiRegionConfiguration", {}).get("MultiRegionKeyType")
514
            != MultiRegionKeyType.PRIMARY
515
        ):
516
            raise UnsupportedOperationException(f"{key_arn} is not a multi-region primary key.")
1✔
517

518
        if key_id in replicate_to_store.keys:
1✔
519
            raise AlreadyExistsException(
×
520
                f"Unable to replicate key {key_id} to region {replica_region}, as the key "
521
                f"already exist there"
522
            )
523
        replica_key = copy.deepcopy(primary_key)
1✔
524
        replica_key.replicate_metadata(request, account_id, replica_region)
1✔
525
        replicate_to_store.keys[key_id] = replica_key
1✔
526

527
        self.update_primary_key_with_replica_keys(primary_key, replica_key, replica_region)
1✔
528

529
        # CurrentKeyMaterialId is not returned in the ReplicaKeyMetadata. May be due to not being evaluated until
530
        # the key has been successfully replicated as it does not show up in DescribeKey immediately either.
531
        replica_key_metadata_response = copy.deepcopy(replica_key.metadata)
1✔
532
        replica_key_metadata_response.pop("CurrentKeyMaterialId", None)
1✔
533

534
        return ReplicateKeyResponse(ReplicaKeyMetadata=replica_key_metadata_response)
1✔
535

536
    @staticmethod
1✔
537
    # Adds new multi region replica key to the primary key's metadata.
538
    def update_primary_key_with_replica_keys(key: KmsKey, replica_key: KmsKey, region: str):
1✔
539
        key.metadata["MultiRegionConfiguration"]["ReplicaKeys"].append(
1✔
540
            MultiRegionKey(
541
                Arn=replica_key.metadata["Arn"],
542
                Region=region,
543
            )
544
        )
545

546
    @handler("UpdateKeyDescription", expand=False)
1✔
547
    def update_key_description(
1✔
548
        self, context: RequestContext, request: UpdateKeyDescriptionRequest
549
    ) -> None:
550
        key = self._get_kms_key(
1✔
551
            context.account_id,
552
            context.region,
553
            request.get("KeyId"),
554
            enabled_key_allowed=True,
555
            disabled_key_allowed=True,
556
        )
557
        key.metadata["Description"] = request.get("Description")
1✔
558

559
    @handler("CreateGrant", expand=False)
1✔
560
    def create_grant(
1✔
561
        self, context: RequestContext, request: CreateGrantRequest
562
    ) -> CreateGrantResponse:
563
        key_account_id, key_region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
564
        key = self._get_kms_key(key_account_id, key_region_name, key_id)
1✔
565

566
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
567
        # matter which type of id is used.
568
        key_id = key.metadata.get("KeyId")
1✔
569
        request["KeyId"] = key_id
1✔
570
        self._validate_grant_request(request)
1✔
571
        grant_name = request.get("Name")
1✔
572

573
        store = self._get_store(context.account_id, context.region)
1✔
574
        if grant_name and (grant_name, key_id) in store.grant_names:
1✔
575
            grant = store.grants[store.grant_names[(grant_name, key_id)]]
×
576
        else:
577
            grant = KmsGrant(request, context.account_id, context.region)
1✔
578
            grant_id = grant.metadata["GrantId"]
1✔
579
            store.grants[grant_id] = grant
1✔
580
            if grant_name:
1✔
581
                store.grant_names[(grant_name, key_id)] = grant_id
1✔
582
            store.grant_tokens[grant.token] = grant_id
1✔
583

584
        # At the moment we do not support multiple GrantTokens for grant creation request. Instead, we always use
585
        # the same token. For the reference, AWS documentation says:
586
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateGrant.html#API_CreateGrant_RequestParameters
587
        # "The returned grant token is unique with every CreateGrant request, even when a duplicate GrantId is
588
        # returned". "A duplicate GrantId" refers to the idempotency of grant creation requests - if a request has
589
        # "Name" field, and if such name already belongs to a previously created grant, no new grant gets created
590
        # and the existing grant with the name is returned.
591
        return CreateGrantResponse(GrantId=grant.metadata["GrantId"], GrantToken=grant.token)
1✔
592

593
    @handler("ListGrants", expand=False)
1✔
594
    def list_grants(
1✔
595
        self, context: RequestContext, request: ListGrantsRequest
596
    ) -> ListGrantsResponse:
597
        if not request.get("KeyId"):
1✔
598
            raise ValidationError("Required input parameter KeyId not specified")
×
599
        key_account_id, key_region_name, _ = self._parse_key_id(request["KeyId"], context)
1✔
600
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
601
        # matter which type of id is used.
602
        key = self._get_kms_key(
1✔
603
            key_account_id, key_region_name, request.get("KeyId"), any_key_state_allowed=True
604
        )
605
        key_id = key.metadata.get("KeyId")
1✔
606

607
        store = self._get_store(context.account_id, context.region)
1✔
608
        grant_id = request.get("GrantId")
1✔
609
        if grant_id:
1✔
610
            if grant_id not in store.grants:
×
611
                raise InvalidGrantIdException()
×
612
            return ListGrantsResponse(Grants=[store.grants[grant_id].metadata])
×
613

614
        matching_grants = []
1✔
615
        grantee_principal = request.get("GranteePrincipal")
1✔
616
        for grant in store.grants.values():
1✔
617
            # KeyId is a mandatory field of ListGrants request, so is going to be present.
618
            _, _, grant_key_id = parse_key_arn(grant.metadata["KeyArn"])
1✔
619
            if grant_key_id != key_id:
1✔
620
                continue
1✔
621
            # GranteePrincipal is a mandatory field for CreateGrant, should be in grants. But it is an optional field
622
            # for ListGrants, so might not be there.
623
            if grantee_principal and grant.metadata["GranteePrincipal"] != grantee_principal:
1✔
624
                continue
×
625
            matching_grants.append(grant.metadata)
1✔
626

627
        grants_list = PaginatedList(matching_grants)
1✔
628
        page, next_token = grants_list.get_page(
1✔
629
            lambda grant_data: grant_data.get("GrantId"),
630
            next_token=request.get("Marker"),
631
            page_size=request.get("Limit", 50),
632
        )
633
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
634

635
        return ListGrantsResponse(Grants=page, **kwargs)
1✔
636

637
    @staticmethod
1✔
638
    def _delete_grant(store: KmsStore, grant_id: str, key_id: str):
1✔
639
        grant = store.grants[grant_id]
1✔
640

641
        _, _, grant_key_id = parse_key_arn(grant.metadata.get("KeyArn"))
1✔
642
        if key_id != grant_key_id:
1✔
643
            raise ValidationError(f"Invalid KeyId={key_id} specified for grant {grant_id}")
×
644

645
        store.grant_tokens.pop(grant.token)
1✔
646
        store.grant_names.pop((grant.metadata.get("Name"), key_id), None)
1✔
647
        store.grants.pop(grant_id)
1✔
648

649
    def revoke_grant(
1✔
650
        self,
651
        context: RequestContext,
652
        key_id: KeyIdType,
653
        grant_id: GrantIdType,
654
        dry_run: NullableBooleanType = None,
655
        **kwargs,
656
    ) -> None:
657
        # TODO add support for "dry_run"
658
        key_account_id, key_region_name, key_id = self._parse_key_id(key_id, context)
1✔
659
        key = self._get_kms_key(key_account_id, key_region_name, key_id, any_key_state_allowed=True)
1✔
660
        key_id = key.metadata.get("KeyId")
1✔
661

662
        store = self._get_store(context.account_id, context.region)
1✔
663

664
        if grant_id not in store.grants:
1✔
665
            raise InvalidGrantIdException()
×
666

667
        self._delete_grant(store, grant_id, key_id)
1✔
668

669
    def retire_grant(
1✔
670
        self,
671
        context: RequestContext,
672
        grant_token: GrantTokenType = None,
673
        key_id: KeyIdType = None,
674
        grant_id: GrantIdType = None,
675
        dry_run: NullableBooleanType = None,
676
        **kwargs,
677
    ) -> None:
678
        # TODO add support for "dry_run"
679
        if not grant_token and (not grant_id or not key_id):
1✔
680
            raise ValidationException("Grant token OR (grant ID, key ID) must be specified")
×
681

682
        if grant_token:
1✔
683
            decoded_token = to_str(base64.b64decode(grant_token))
1✔
684
            grant_account_id, grant_region_name, _ = decoded_token.split(":")
1✔
685
            grant_store = self._get_store(grant_account_id, grant_region_name)
1✔
686

687
            if grant_token not in grant_store.grant_tokens:
1✔
688
                raise NotFoundException(f"Unable to find grant token {grant_token}")
×
689

690
            grant_id = grant_store.grant_tokens[grant_token]
1✔
691
        else:
692
            grant_store = self._get_store(context.account_id, context.region)
1✔
693

694
        if key_id:
1✔
695
            key_account_id, key_region_name, key_id = self._parse_key_id(key_id, context)
1✔
696
            key = self._get_kms_key(
1✔
697
                key_account_id, key_region_name, key_id, any_key_state_allowed=True
698
            )
699
            key_id = key.metadata.get("KeyId")
1✔
700
        else:
701
            _, _, key_id = parse_key_arn(grant_store.grants[grant_id].metadata.get("KeyArn"))
1✔
702

703
        self._delete_grant(grant_store, grant_id, key_id)
1✔
704

705
    def list_retirable_grants(
1✔
706
        self,
707
        context: RequestContext,
708
        retiring_principal: PrincipalIdType,
709
        limit: LimitType = None,
710
        marker: MarkerType = None,
711
        **kwargs,
712
    ) -> ListGrantsResponse:
713
        if not retiring_principal:
1✔
714
            raise ValidationError("Required input parameter 'RetiringPrincipal' not specified")
×
715

716
        matching_grants = [
1✔
717
            grant.metadata
718
            for grant in self._get_store(context.account_id, context.region).grants.values()
719
            if grant.metadata.get("RetiringPrincipal") == retiring_principal
720
        ]
721
        grants_list = PaginatedList(matching_grants)
1✔
722
        limit = limit or 50
1✔
723
        page, next_token = grants_list.get_page(
1✔
724
            lambda grant_data: grant_data.get("GrantId"),
725
            next_token=marker,
726
            page_size=limit,
727
        )
728
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
729

730
        return ListGrantsResponse(Grants=page, **kwargs)
1✔
731

732
    def get_public_key(
1✔
733
        self,
734
        context: RequestContext,
735
        key_id: KeyIdType,
736
        grant_tokens: GrantTokenList = None,
737
        **kwargs,
738
    ) -> GetPublicKeyResponse:
739
        # According to https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html, GetPublicKey is supposed
740
        # to fail for disabled keys. But it actually doesn't fail in AWS.
741
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
742
        key = self._get_kms_key(
1✔
743
            account_id,
744
            region_name,
745
            key_id,
746
            enabled_key_allowed=True,
747
            disabled_key_allowed=True,
748
        )
749
        attrs = [
1✔
750
            "KeySpec",
751
            "KeyUsage",
752
            "EncryptionAlgorithms",
753
            "SigningAlgorithms",
754
        ]
755
        result = select_attributes(key.metadata, attrs)
1✔
756
        result["PublicKey"] = key.crypto_key.public_key
1✔
757
        result["KeyId"] = key.metadata["Arn"]
1✔
758
        return GetPublicKeyResponse(**result)
1✔
759

760
    def _generate_data_key_pair(
1✔
761
        self,
762
        context: RequestContext,
763
        key_id: str,
764
        key_pair_spec: str,
765
        encryption_context: EncryptionContextType = None,
766
        dry_run: NullableBooleanType = None,
767
    ):
768
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
769
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
770
        self._validate_key_for_encryption_decryption(context, key)
1✔
771
        KmsCryptoKey.assert_valid(key_pair_spec)
1✔
772
        return execute_dry_run_capable(
1✔
773
            self._build_data_key_pair_response, dry_run, key, key_pair_spec, encryption_context
774
        )
775

776
    def _build_data_key_pair_response(
1✔
777
        self, key: KmsKey, key_pair_spec: str, encryption_context: EncryptionContextType = None
778
    ):
779
        crypto_key = KmsCryptoKey(key_pair_spec)
1✔
780

781
        return {
1✔
782
            "KeyId": key.metadata["Arn"],
783
            "KeyPairSpec": key_pair_spec,
784
            "PrivateKeyCiphertextBlob": key.encrypt(crypto_key.private_key, encryption_context),
785
            "PrivateKeyPlaintext": crypto_key.private_key,
786
            "PublicKey": crypto_key.public_key,
787
        }
788

789
    @handler("GenerateDataKeyPair")
1✔
790
    def generate_data_key_pair(
1✔
791
        self,
792
        context: RequestContext,
793
        key_id: KeyIdType,
794
        key_pair_spec: DataKeyPairSpec,
795
        encryption_context: EncryptionContextType = None,
796
        grant_tokens: GrantTokenList = None,
797
        recipient: RecipientInfo = None,
798
        dry_run: NullableBooleanType = None,
799
        **kwargs,
800
    ) -> GenerateDataKeyPairResponse:
801
        result = self._generate_data_key_pair(
1✔
802
            context, key_id, key_pair_spec, encryption_context, dry_run
803
        )
804
        return GenerateDataKeyPairResponse(**result)
1✔
805

806
    @handler("GenerateRandom", expand=False)
1✔
807
    def generate_random(
1✔
808
        self, context: RequestContext, request: GenerateRandomRequest
809
    ) -> GenerateRandomResponse:
810
        number_of_bytes = request.get("NumberOfBytes")
1✔
811
        if number_of_bytes is None:
1✔
812
            raise ValidationException("NumberOfBytes is required.")
1✔
813
        if number_of_bytes > 1024:
1✔
814
            raise ValidationException(
1✔
815
                f"1 validation error detected: Value '{number_of_bytes}' at 'numberOfBytes' failed "
816
                "to satisfy constraint: Member must have value less than or equal to 1024"
817
            )
818
        if number_of_bytes < 1:
1✔
819
            raise ValidationException(
1✔
820
                f"1 validation error detected: Value '{number_of_bytes}' at 'numberOfBytes' failed "
821
                "to satisfy constraint: Member must have value greater than or equal to 1"
822
            )
823

824
        byte_string = os.urandom(number_of_bytes)
1✔
825

826
        return GenerateRandomResponse(Plaintext=byte_string)
1✔
827

828
    @handler("GenerateDataKeyPairWithoutPlaintext")
1✔
829
    def generate_data_key_pair_without_plaintext(
1✔
830
        self,
831
        context: RequestContext,
832
        key_id: KeyIdType,
833
        key_pair_spec: DataKeyPairSpec,
834
        encryption_context: EncryptionContextType = None,
835
        grant_tokens: GrantTokenList = None,
836
        dry_run: NullableBooleanType = None,
837
        **kwargs,
838
    ) -> GenerateDataKeyPairWithoutPlaintextResponse:
839
        result = self._generate_data_key_pair(
1✔
840
            context, key_id, key_pair_spec, encryption_context, dry_run
841
        )
842
        result.pop("PrivateKeyPlaintext")
1✔
843
        return GenerateDataKeyPairResponse(**result)
1✔
844

845
    # We currently act on neither on KeySpec setting (which is different from and holds values different then
846
    # KeySpec for CreateKey) nor on NumberOfBytes. Instead, we generate a key with a key length that is "standard" in
847
    # LocalStack.
848
    #
849
    def _generate_data_key(
1✔
850
        self, context: RequestContext, key_id: str, encryption_context: EncryptionContextType = None
851
    ):
852
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
853
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
854
        # TODO Should also have a validation for the key being a symmetric one.
855
        self._validate_key_for_encryption_decryption(context, key)
1✔
856
        crypto_key = KmsCryptoKey("SYMMETRIC_DEFAULT")
1✔
857
        return {
1✔
858
            "KeyId": key.metadata["Arn"],
859
            "Plaintext": crypto_key.key_material,
860
            "CiphertextBlob": key.encrypt(crypto_key.key_material, encryption_context),
861
        }
862

863
    @handler("GenerateDataKey", expand=False)
1✔
864
    def generate_data_key(
1✔
865
        self, context: RequestContext, request: GenerateDataKeyRequest
866
    ) -> GenerateDataKeyResponse:
867
        result = self._generate_data_key(
1✔
868
            context, request.get("KeyId"), request.get("EncryptionContext")
869
        )
870
        return GenerateDataKeyResponse(**result)
1✔
871

872
    @handler("GenerateDataKeyWithoutPlaintext", expand=False)
1✔
873
    def generate_data_key_without_plaintext(
1✔
874
        self, context: RequestContext, request: GenerateDataKeyWithoutPlaintextRequest
875
    ) -> GenerateDataKeyWithoutPlaintextResponse:
876
        result = self._generate_data_key(
1✔
877
            context, request.get("KeyId"), request.get("EncryptionContext")
878
        )
879
        result.pop("Plaintext")
1✔
880
        return GenerateDataKeyWithoutPlaintextResponse(**result)
1✔
881

882
    @handler("GenerateMac", expand=False)
1✔
883
    def generate_mac(
1✔
884
        self,
885
        context: RequestContext,
886
        request: GenerateMacRequest,
887
    ) -> GenerateMacResponse:
888
        msg = request.get("Message")
1✔
889
        self._validate_mac_msg_length(msg)
1✔
890

891
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
892
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
893

894
        self._validate_key_for_generate_verify_mac(context, key)
1✔
895

896
        algorithm = request.get("MacAlgorithm")
1✔
897
        self._validate_mac_algorithm(key, algorithm)
1✔
898

899
        mac = key.generate_mac(msg, algorithm)
1✔
900

901
        return GenerateMacResponse(Mac=mac, MacAlgorithm=algorithm, KeyId=key.metadata.get("Arn"))
1✔
902

903
    @handler("VerifyMac", expand=False)
1✔
904
    def verify_mac(
1✔
905
        self,
906
        context: RequestContext,
907
        request: VerifyMacRequest,
908
    ) -> VerifyMacResponse:
909
        msg = request.get("Message")
1✔
910
        self._validate_mac_msg_length(msg)
1✔
911

912
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
913
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
914

915
        self._validate_key_for_generate_verify_mac(context, key)
1✔
916

917
        algorithm = request.get("MacAlgorithm")
1✔
918
        self._validate_mac_algorithm(key, algorithm)
1✔
919

920
        mac_valid = key.verify_mac(msg, request.get("Mac"), algorithm)
1✔
921

922
        return VerifyMacResponse(
1✔
923
            KeyId=key.metadata.get("Arn"), MacValid=mac_valid, MacAlgorithm=algorithm
924
        )
925

926
    @handler("Sign", expand=False)
1✔
927
    def sign(self, context: RequestContext, request: SignRequest) -> SignResponse:
1✔
928
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
929
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
930

931
        self._validate_key_for_sign_verify(context, key)
1✔
932

933
        # TODO Add constraints on KeySpec / SigningAlgorithm pairs:
934
        #  https://docs.aws.amazon.com/kms/latest/developerguide/asymmetric-key-specs.html#key-spec-ecc
935

936
        signing_algorithm = request.get("SigningAlgorithm")
1✔
937
        signature = key.sign(request.get("Message"), request.get("MessageType"), signing_algorithm)
1✔
938

939
        result = {
1✔
940
            "KeyId": key.metadata["Arn"],
941
            "Signature": signature,
942
            "SigningAlgorithm": signing_algorithm,
943
        }
944
        return SignResponse(**result)
1✔
945

946
    # Currently LocalStack only calculates SHA256 digests no matter what the signing algorithm is.
947
    @handler("Verify", expand=False)
1✔
948
    def verify(self, context: RequestContext, request: VerifyRequest) -> VerifyResponse:
1✔
949
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
950
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
951

952
        self._validate_key_for_sign_verify(context, key)
1✔
953

954
        signing_algorithm = request.get("SigningAlgorithm")
1✔
955
        is_signature_valid = key.verify(
1✔
956
            request.get("Message"),
957
            request.get("MessageType"),
958
            signing_algorithm,
959
            request.get("Signature"),
960
        )
961

962
        result = {
1✔
963
            "KeyId": key.metadata["Arn"],
964
            "SignatureValid": is_signature_valid,
965
            "SigningAlgorithm": signing_algorithm,
966
        }
967
        return VerifyResponse(**result)
1✔
968

969
    def re_encrypt(
1✔
970
        self,
971
        context: RequestContext,
972
        ciphertext_blob: CiphertextType,
973
        destination_key_id: KeyIdType,
974
        source_encryption_context: EncryptionContextType = None,
975
        source_key_id: KeyIdType = None,
976
        destination_encryption_context: EncryptionContextType = None,
977
        source_encryption_algorithm: EncryptionAlgorithmSpec = None,
978
        destination_encryption_algorithm: EncryptionAlgorithmSpec = None,
979
        grant_tokens: GrantTokenList = None,
980
        dry_run: NullableBooleanType = None,
981
        **kwargs,
982
    ) -> ReEncryptResponse:
983
        # TODO: when implementing, ensure cross-account support for source_key_id and destination_key_id
984
        # Parse and fetch source Key
985
        account_id, region_name, source_key_id = self._parse_key_id(source_key_id, context)
1✔
986
        source_key = self._get_kms_key(account_id, region_name, source_key_id)
1✔
987
        # Decrypt using source key
988
        decrypt_response = self.decrypt(
1✔
989
            context=context,
990
            ciphertext_blob=ciphertext_blob,
991
            encryption_context=source_encryption_context,
992
            encryption_algorithm=source_encryption_algorithm,
993
            key_id=source_key_id,
994
            grant_tokens=grant_tokens,
995
        )
996
        # Parse and fetch destination key
997
        account_id, region_name, destination_key_id = self._parse_key_id(
1✔
998
            destination_key_id, context
999
        )
1000
        destination_key = self._get_kms_key(account_id, region_name, destination_key_id)
1✔
1001
        # Encrypt using destination key
1002
        encrypt_response = self.encrypt(
1✔
1003
            context=context,
1004
            encryption_context=destination_encryption_context,
1005
            key_id=destination_key_id,
1006
            plaintext=decrypt_response["Plaintext"],
1007
            grant_tokens=grant_tokens,
1008
            dry_run=dry_run,
1009
        )
1010
        return ReEncryptResponse(
1✔
1011
            CiphertextBlob=encrypt_response["CiphertextBlob"],
1012
            SourceKeyId=source_key.metadata.get("Arn"),
1013
            KeyId=destination_key.metadata.get("Arn"),
1014
            SourceEncryptionAlgorithm=source_encryption_algorithm,
1015
            DestinationEncryptionAlgorithm=destination_encryption_algorithm,
1016
        )
1017

1018
    def encrypt(
1✔
1019
        self,
1020
        context: RequestContext,
1021
        key_id: KeyIdType,
1022
        plaintext: PlaintextType,
1023
        encryption_context: EncryptionContextType = None,
1024
        grant_tokens: GrantTokenList = None,
1025
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1026
        dry_run: NullableBooleanType = None,
1027
        **kwargs,
1028
    ) -> EncryptResponse:
1029
        # TODO add support for "dry_run"
1030
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
1031
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1032
        self._validate_plaintext_length(plaintext)
1✔
1033
        self._validate_plaintext_key_type_based(plaintext, key, encryption_algorithm)
1✔
1034
        self._validate_key_for_encryption_decryption(context, key)
1✔
1035
        self._validate_key_state_not_pending_import(key)
1✔
1036

1037
        ciphertext_blob = key.encrypt(plaintext, encryption_context)
1✔
1038
        # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always
1039
        # encrypts with symmetric encryption no matter the key settings.
1040
        return EncryptResponse(
1✔
1041
            CiphertextBlob=ciphertext_blob,
1042
            KeyId=key.metadata.get("Arn"),
1043
            EncryptionAlgorithm=encryption_algorithm,
1044
        )
1045

1046
    # TODO We currently do not even check encryption_context, while moto does. Should add the corresponding logic later.
1047
    def decrypt(
1✔
1048
        self,
1049
        context: RequestContext,
1050
        ciphertext_blob: CiphertextType,
1051
        encryption_context: EncryptionContextType = None,
1052
        grant_tokens: GrantTokenList = None,
1053
        key_id: KeyIdType = None,
1054
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1055
        recipient: RecipientInfo = None,
1056
        dry_run: NullableBooleanType = None,
1057
        **kwargs,
1058
    ) -> DecryptResponse:
1059
        # In AWS, key_id is only supplied for data encrypted with an asymmetrical algorithm. For symmetrical
1060
        # encryption, key_id is taken from the encrypted data itself.
1061
        # Since LocalStack doesn't currently do asymmetrical encryption, there is a question of modeling here: we
1062
        # currently expect data to be only encrypted with symmetric encryption, so having key_id inside. It might not
1063
        # always be what customers expect.
1064
        if key_id:
1✔
1065
            account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
1066
            try:
1✔
1067
                ciphertext = deserialize_ciphertext_blob(ciphertext_blob=ciphertext_blob)
1✔
1068
            except Exception as e:
1✔
1069
                logging.error("Error deserializing ciphertext blob: %s", e)
1✔
1070
                ciphertext = None
1✔
1071
                pass
1✔
1072
        else:
1073
            try:
×
1074
                ciphertext = deserialize_ciphertext_blob(ciphertext_blob=ciphertext_blob)
×
1075
                account_id, region_name, key_id = self._parse_key_id(ciphertext.key_id, context)
×
1076
            except Exception:
×
1077
                raise InvalidCiphertextException(
×
1078
                    "LocalStack is unable to deserialize the ciphertext blob. Perhaps the "
1079
                    "blob didn't come from LocalStack"
1080
                )
1081

1082
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1083
        if ciphertext and key.metadata["KeyId"] != ciphertext.key_id:
1✔
1084
            raise IncorrectKeyException(
1✔
1085
                "The key ID in the request does not identify a CMK that can perform this operation."
1086
            )
1087

1088
        self._validate_key_for_encryption_decryption(context, key)
1✔
1089
        self._validate_key_state_not_pending_import(key)
1✔
1090

1091
        # Handle the recipient field.  This is used by AWS Nitro to re-encrypt the plaintext to the key specified
1092
        # by the enclave.  Proper support for this will take significant work to figure out how to model enforcing
1093
        # the attestation measurements; for now, if recipient is specified and has an attestation doc in it including
1094
        # a public key where it's expected to be, we encrypt to that public key.  This at least allows users to use
1095
        # localstack as a drop-in replacement for AWS when testing without having to skip the secondary decryption
1096
        # when using localstack.
1097
        recipient_pubkey = None
1✔
1098
        if recipient:
1✔
1099
            attestation_document = recipient["AttestationDocument"]
1✔
1100
            # We do all of this in a try/catch and warn if it fails so that if users are currently passing a nonsense
1101
            # value we don't break it for them.  In the future we could do a breaking change to require a valid attestation
1102
            # (or at least one that contains the public key).
1103
            try:
1✔
1104
                recipient_pubkey = self._extract_attestation_pubkey(attestation_document)
1✔
1105
            except Exception as e:
1✔
1106
                logging.warning(
1✔
1107
                    "Unable to extract public key from non-empty attestation document: %s", e
1108
                )
1109

1110
        try:
1✔
1111
            # TODO: Extend the implementation to handle additional encryption/decryption scenarios
1112
            # beyond the current support for offline encryption and online decryption using RSA keys if key id exists in
1113
            # parameters, where `ciphertext_blob` will not be deserializable.
1114
            if self._is_rsa_spec(key.crypto_key.key_spec) and not ciphertext:
1✔
1115
                plaintext = key.decrypt_rsa(ciphertext_blob)
1✔
1116
            else:
1117
                # if symmetric encryption then ciphertext must not be None
1118
                if ciphertext is None:
1✔
1119
                    raise InvalidCiphertextException()
1✔
1120
                plaintext = key.decrypt(ciphertext, encryption_context)
1✔
1121
        except InvalidTag:
1✔
1122
            raise InvalidCiphertextException()
×
1123

1124
        # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always
1125
        # encrypts with symmetric encryption no matter the key settings.
1126
        #
1127
        # We return a key ARN instead of KeyId despite the name of the parameter, as this is what AWS does and states
1128
        # in its docs.
1129
        #  https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html#API_Decrypt_RequestSyntax
1130
        # TODO add support for "dry_run"
1131
        response = DecryptResponse(
1✔
1132
            KeyId=key.metadata.get("Arn"),
1133
            EncryptionAlgorithm=encryption_algorithm,
1134
        )
1135

1136
        # Encrypt to the recipient pubkey if specified.  Otherwise, return the actual plaintext
1137
        if recipient_pubkey:
1✔
1138
            response["CiphertextForRecipient"] = pkcs7_envelope_encrypt(plaintext, recipient_pubkey)
1✔
1139
        else:
1140
            response["Plaintext"] = plaintext
1✔
1141

1142
        return response
1✔
1143

1144
    def get_parameters_for_import(
1✔
1145
        self,
1146
        context: RequestContext,
1147
        key_id: KeyIdType,
1148
        wrapping_algorithm: AlgorithmSpec,
1149
        wrapping_key_spec: WrappingKeySpec,
1150
        **kwargs,
1151
    ) -> GetParametersForImportResponse:
1152
        store = self._get_store(context.account_id, context.region)
1✔
1153
        # KeyId can potentially hold one of multiple different types of key identifiers. get_key finds a key no
1154
        # matter which type of id is used.
1155
        key_to_import_material_to = self._get_kms_key(
1✔
1156
            context.account_id,
1157
            context.region,
1158
            key_id,
1159
            enabled_key_allowed=True,
1160
            disabled_key_allowed=True,
1161
        )
1162
        key_arn = key_to_import_material_to.metadata["Arn"]
1✔
1163
        key_origin = key_to_import_material_to.metadata.get("Origin")
1✔
1164

1165
        if key_origin != "EXTERNAL":
1✔
1166
            raise UnsupportedOperationException(
1✔
1167
                f"{key_arn} origin is {key_origin} which is not valid for this operation."
1168
            )
1169

1170
        key_id = key_to_import_material_to.metadata["KeyId"]
1✔
1171

1172
        key = KmsKey(CreateKeyRequest(KeySpec=wrapping_key_spec))
1✔
1173
        import_token = short_uid()
1✔
1174
        import_state = KeyImportState(
1✔
1175
            key_id=key_id, import_token=import_token, wrapping_algo=wrapping_algorithm, key=key
1176
        )
1177
        store.imports[import_token] = import_state
1✔
1178
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_GetParametersForImport.html
1179
        # "To import key material, you must use the public key and import token from the same response. These items
1180
        # are valid for 24 hours."
1181
        expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)
1✔
1182
        return GetParametersForImportResponse(
1✔
1183
            KeyId=key_to_import_material_to.metadata["Arn"],
1184
            ImportToken=to_bytes(import_state.import_token),
1185
            PublicKey=import_state.key.crypto_key.public_key,
1186
            ParametersValidTo=expiry_date,
1187
        )
1188

1189
    def import_key_material(
1✔
1190
        self,
1191
        context: RequestContext,
1192
        key_id: KeyIdType,
1193
        import_token: CiphertextType,
1194
        encrypted_key_material: CiphertextType,
1195
        valid_to: DateType | None = None,
1196
        expiration_model: ExpirationModelType | None = None,
1197
        import_type: ImportType | None = None,
1198
        key_material_description: KeyMaterialDescriptionType | None = None,
1199
        key_material_id: BackingKeyIdType | None = None,
1200
        **kwargs,
1201
    ) -> ImportKeyMaterialResponse:
1202
        store = self._get_store(context.account_id, context.region)
1✔
1203
        import_token = to_str(import_token)
1✔
1204
        import_state = store.imports.get(import_token)
1✔
1205
        if not import_state:
1✔
1206
            raise NotFoundException(f"Unable to find key import token '{import_token}'")
×
1207
        key_to_import_material_to = self._get_kms_key(
1✔
1208
            context.account_id,
1209
            context.region,
1210
            key_id,
1211
            enabled_key_allowed=True,
1212
            disabled_key_allowed=True,
1213
        )
1214

1215
        # TODO check if there was already a key imported for this kms key
1216
        # if so, it has to be identical. We cannot change keys by reimporting after deletion/expiry
1217
        key_material = self._decrypt_wrapped_key_material(import_state, encrypted_key_material)
1✔
1218
        key_material_id = key_to_import_material_to.generate_key_material_id(key_material)
1✔
1219
        key_to_import_material_to.metadata["ExpirationModel"] = (
1✔
1220
            expiration_model or ExpirationModelType.KEY_MATERIAL_EXPIRES
1221
        )
1222
        if (
1✔
1223
            key_to_import_material_to.metadata["ExpirationModel"]
1224
            == ExpirationModelType.KEY_MATERIAL_EXPIRES
1225
            and not valid_to
1226
        ):
1227
            raise ValidationException(
1✔
1228
                "A validTo date must be set if the ExpirationModel is KEY_MATERIAL_EXPIRES"
1229
            )
1230
        if existing_pending_material := key_to_import_material_to.crypto_key.pending_key_material:
1✔
1231
            pending_key_material_id = key_to_import_material_to.generate_key_material_id(
1✔
1232
                existing_pending_material
1233
            )
1234
            raise KMSInvalidStateException(
1✔
1235
                f"New key material (id: {key_material_id}) cannot be imported into KMS key "
1236
                f"{key_to_import_material_to.metadata['Arn']}, because another key material "
1237
                f"(id: {pending_key_material_id}) is pending rotation."
1238
            )
1239

1240
        # TODO actually set validTo and make the key expire
1241
        key_to_import_material_to.metadata["Enabled"] = True
1✔
1242
        key_to_import_material_to.metadata["KeyState"] = KeyState.Enabled
1✔
1243
        key_to_import_material_to.crypto_key.load_key_material(key_material)
1✔
1244

1245
        # KeyMaterialId / CurrentKeyMaterialId is only exposed for symmetric encryption keys.
1246
        key_material_id_response = None
1✔
1247
        if key_to_import_material_to.metadata["KeySpec"] == KeySpec.SYMMETRIC_DEFAULT:
1✔
1248
            key_material_id_response = key_to_import_material_to.generate_key_material_id(
1✔
1249
                key_material
1250
            )
1251

1252
            # If there is no CurrentKeyMaterialId, instantly promote the pending key material to the current.
1253
            if key_to_import_material_to.metadata.get("CurrentKeyMaterialId") is None:
1✔
1254
                key_to_import_material_to.metadata["CurrentKeyMaterialId"] = (
1✔
1255
                    key_material_id_response
1256
                )
1257
                key_to_import_material_to.crypto_key.key_material = (
1✔
1258
                    key_to_import_material_to.crypto_key.pending_key_material
1259
                )
1260
                key_to_import_material_to.crypto_key.pending_key_material = None
1✔
1261

1262
        return ImportKeyMaterialResponse(
1✔
1263
            KeyId=key_to_import_material_to.metadata["Arn"],
1264
            KeyMaterialId=key_material_id_response,
1265
        )
1266

1267
    def delete_imported_key_material(
1✔
1268
        self,
1269
        context: RequestContext,
1270
        key_id: KeyIdType,
1271
        key_material_id: BackingKeyIdType | None = None,
1272
        **kwargs,
1273
    ) -> DeleteImportedKeyMaterialResponse:
1274
        # TODO add support for key_material_id
1275
        key = self._get_kms_key(
1✔
1276
            context.account_id,
1277
            context.region,
1278
            key_id,
1279
            enabled_key_allowed=True,
1280
            disabled_key_allowed=True,
1281
        )
1282
        key.crypto_key.key_material = None
1✔
1283
        key.metadata["Enabled"] = False
1✔
1284
        key.metadata["KeyState"] = KeyState.PendingImport
1✔
1285
        key.metadata.pop("ExpirationModel", None)
1✔
1286

1287
        # TODO populate DeleteImportedKeyMaterialResponse
1288
        return DeleteImportedKeyMaterialResponse()
1✔
1289

1290
    @handler("CreateAlias", expand=False)
1✔
1291
    def create_alias(self, context: RequestContext, request: CreateAliasRequest) -> None:
1✔
1292
        store = self._get_store(context.account_id, context.region)
1✔
1293
        alias_name = request["AliasName"]
1✔
1294
        validate_alias_name(alias_name)
1✔
1295
        if alias_name in store.aliases:
1✔
1296
            alias_arn = store.aliases.get(alias_name).metadata["AliasArn"]
×
1297
            # AWS itself uses AliasArn instead of AliasName in this exception.
1298
            raise AlreadyExistsException(f"An alias with the name {alias_arn} already exists")
×
1299
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
1300
        # matter which type of id is used.
1301
        key = self._get_kms_key(
1✔
1302
            context.account_id,
1303
            context.region,
1304
            request.get("TargetKeyId"),
1305
            enabled_key_allowed=True,
1306
            disabled_key_allowed=True,
1307
        )
1308
        request["TargetKeyId"] = key.metadata.get("KeyId")
1✔
1309
        self._create_kms_alias(context.account_id, context.region, request)
1✔
1310

1311
    @handler("DeleteAlias", expand=False)
1✔
1312
    def delete_alias(self, context: RequestContext, request: DeleteAliasRequest) -> None:
1✔
1313
        # We do not check the state of the key, as, according to AWS docs, all key states, that are possible in
1314
        # LocalStack, are supported by this operation.
1315
        store = self._get_store(context.account_id, context.region)
1✔
1316
        alias_name = request["AliasName"]
1✔
1317
        if alias_name not in store.aliases:
1✔
1318
            alias_arn = kms_alias_arn(request["AliasName"], context.account_id, context.region)
1✔
1319
            # AWS itself uses AliasArn instead of AliasName in this exception.
1320
            raise NotFoundException(f"Alias {alias_arn} is not found")
1✔
1321
        store.aliases.pop(alias_name, None)
1✔
1322

1323
    @handler("UpdateAlias", expand=False)
1✔
1324
    def update_alias(self, context: RequestContext, request: UpdateAliasRequest) -> None:
1✔
1325
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1326
        # "If the source KMS key is pending deletion, the command succeeds. If the destination KMS key is pending
1327
        # deletion, the command fails with error: KMSInvalidStateException : <key ARN> is pending deletion."
1328
        # Also disabled keys are accepted for this operation (see the table on that page).
1329
        #
1330
        # As such, we do not care about the state of the source key, but check the destination one.
1331

1332
        alias_name = request["AliasName"]
1✔
1333
        # This API, per AWS docs, accepts only names, not ARNs.
1334
        validate_alias_name(alias_name)
1✔
1335
        alias = self._get_kms_alias(context.account_id, context.region, alias_name)
1✔
1336
        key_id = request["TargetKeyId"]
1✔
1337
        # Don't care about the key itself, just want to validate its state.
1338
        self._get_kms_key(
1✔
1339
            context.account_id,
1340
            context.region,
1341
            key_id,
1342
            enabled_key_allowed=True,
1343
            disabled_key_allowed=True,
1344
        )
1345
        alias.metadata["TargetKeyId"] = key_id
1✔
1346
        alias.update_date_of_last_update()
1✔
1347

1348
    @handler("ListAliases")
1✔
1349
    def list_aliases(
1✔
1350
        self,
1351
        context: RequestContext,
1352
        key_id: KeyIdType = None,
1353
        limit: LimitType = None,
1354
        marker: MarkerType = None,
1355
        **kwargs,
1356
    ) -> ListAliasesResponse:
1357
        store = self._get_store(context.account_id, context.region)
1✔
1358
        if key_id:
1✔
1359
            # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
1360
            # matter which type of id is used.
1361
            key = self._get_kms_key(
1✔
1362
                context.account_id, context.region, key_id, any_key_state_allowed=True
1363
            )
1364
            key_id = key.metadata.get("KeyId")
1✔
1365

1366
        matching_aliases = []
1✔
1367
        for alias in store.aliases.values():
1✔
1368
            if key_id and alias.metadata["TargetKeyId"] != key_id:
1✔
1369
                continue
1✔
1370
            matching_aliases.append(alias.metadata)
1✔
1371
        aliases_list = PaginatedList(matching_aliases)
1✔
1372
        limit = limit or 100
1✔
1373
        page, next_token = aliases_list.get_page(
1✔
1374
            lambda alias_metadata: alias_metadata.get("AliasName"),
1375
            next_token=marker,
1376
            page_size=limit,
1377
        )
1378
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
1379
        return ListAliasesResponse(Aliases=page, **kwargs)
1✔
1380

1381
    @handler("GetKeyRotationStatus", expand=False)
1✔
1382
    def get_key_rotation_status(
1✔
1383
        self, context: RequestContext, request: GetKeyRotationStatusRequest
1384
    ) -> GetKeyRotationStatusResponse:
1385
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1386
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1387
        # We do not model that here, though.
1388
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
1389
        key = self._get_kms_key(account_id, region_name, key_id, any_key_state_allowed=True)
1✔
1390

1391
        response = GetKeyRotationStatusResponse(
1✔
1392
            KeyId=key.metadata["Arn"],
1393
            KeyRotationEnabled=key.is_key_rotation_enabled,
1394
            NextRotationDate=key.next_rotation_date,
1395
        )
1396
        if key.is_key_rotation_enabled:
1✔
1397
            response["RotationPeriodInDays"] = key.rotation_period_in_days
1✔
1398

1399
        return response
1✔
1400

1401
    @handler("DisableKeyRotation", expand=False)
1✔
1402
    def disable_key_rotation(
1✔
1403
        self, context: RequestContext, request: DisableKeyRotationRequest
1404
    ) -> None:
1405
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1406
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1407
        # We do not model that here, though.
1408
        key = self._get_kms_key(context.account_id, context.region, request.get("KeyId"))
1✔
1409
        key.is_key_rotation_enabled = False
1✔
1410

1411
    @handler("EnableKeyRotation", expand=False)
1✔
1412
    def enable_key_rotation(
1✔
1413
        self, context: RequestContext, request: EnableKeyRotationRequest
1414
    ) -> None:
1415
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1416
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1417
        # We do not model that here, though.
1418
        key = self._get_kms_key(context.account_id, context.region, request.get("KeyId"))
1✔
1419
        key.is_key_rotation_enabled = True
1✔
1420
        if request.get("RotationPeriodInDays"):
1✔
1421
            key.rotation_period_in_days = request.get("RotationPeriodInDays")
1✔
1422
        key._update_key_rotation_date()
1✔
1423

1424
    @handler("ListKeyPolicies", expand=False)
1✔
1425
    def list_key_policies(
1✔
1426
        self, context: RequestContext, request: ListKeyPoliciesRequest
1427
    ) -> ListKeyPoliciesResponse:
1428
        # We just care if the key exists. The response, by AWS specifications, is the same for all keys, as the only
1429
        # supported policy is "default":
1430
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeyPolicies.html#API_ListKeyPolicies_ResponseElements
1431
        self._get_kms_key(
1✔
1432
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1433
        )
1434
        return ListKeyPoliciesResponse(PolicyNames=["default"], Truncated=False)
1✔
1435

1436
    @handler("PutKeyPolicy", expand=False)
1✔
1437
    def put_key_policy(self, context: RequestContext, request: PutKeyPolicyRequest) -> None:
1✔
1438
        key = self._get_kms_key(
1✔
1439
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1440
        )
1441
        if request.get("PolicyName") != "default":
1✔
1442
            raise UnsupportedOperationException("Only default policy is supported")
×
1443
        key.policy = request.get("Policy")
1✔
1444

1445
    @handler("GetKeyPolicy", expand=False)
1✔
1446
    def get_key_policy(
1✔
1447
        self, context: RequestContext, request: GetKeyPolicyRequest
1448
    ) -> GetKeyPolicyResponse:
1449
        key = self._get_kms_key(
1✔
1450
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1451
        )
1452
        if request.get("PolicyName") != "default":
1✔
1453
            raise NotFoundException("No such policy exists")
×
1454
        return GetKeyPolicyResponse(Policy=key.policy)
1✔
1455

1456
    @handler("ListResourceTags", expand=False)
1✔
1457
    def list_resource_tags(
1✔
1458
        self, context: RequestContext, request: ListResourceTagsRequest
1459
    ) -> ListResourceTagsResponse:
1460
        key = self._get_kms_key(
1✔
1461
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1462
        )
1463
        keys_list = PaginatedList(
1✔
1464
            [{"TagKey": tag_key, "TagValue": tag_value} for tag_key, tag_value in key.tags.items()]
1465
        )
1466
        page, next_token = keys_list.get_page(
1✔
1467
            lambda tag: tag.get("TagKey"),
1468
            next_token=request.get("Marker"),
1469
            page_size=request.get("Limit", 50),
1470
        )
1471
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
1472
        return ListResourceTagsResponse(Tags=page, **kwargs)
1✔
1473

1474
    @handler("RotateKeyOnDemand", expand=False)
1✔
1475
    # TODO: return the key rotations in the ListKeyRotations operation
1476
    def rotate_key_on_demand(
1✔
1477
        self, context: RequestContext, request: RotateKeyOnDemandRequest
1478
    ) -> RotateKeyOnDemandResponse:
1479
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
1480
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1481

1482
        if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT:
1✔
1483
            raise UnsupportedOperationException()
1✔
1484
        self._validate_key_state_not_pending_import(key)
1✔
1485
        self._validate_external_key_has_pending_material(key)
1✔
1486

1487
        key.rotate_key_on_demand()
1✔
1488

1489
        return RotateKeyOnDemandResponse(
1✔
1490
            KeyId=key.metadata["Arn"],
1491
        )
1492

1493
    @handler("TagResource", expand=False)
1✔
1494
    def tag_resource(self, context: RequestContext, request: TagResourceRequest) -> None:
1✔
1495
        key = self._get_kms_key(
1✔
1496
            context.account_id,
1497
            context.region,
1498
            request.get("KeyId"),
1499
            enabled_key_allowed=True,
1500
            disabled_key_allowed=True,
1501
        )
1502
        key.add_tags(request.get("Tags"))
1✔
1503

1504
    @handler("UntagResource", expand=False)
1✔
1505
    def untag_resource(self, context: RequestContext, request: UntagResourceRequest) -> None:
1✔
1506
        key = self._get_kms_key(
1✔
1507
            context.account_id,
1508
            context.region,
1509
            request.get("KeyId"),
1510
            enabled_key_allowed=True,
1511
            disabled_key_allowed=True,
1512
        )
1513
        if not request.get("TagKeys"):
1✔
1514
            return
×
1515
        for tag_key in request.get("TagKeys"):
1✔
1516
            # AWS doesn't seem to mind removal of a non-existent tag, so we do not raise any exception.
1517
            key.tags.pop(tag_key, None)
1✔
1518

1519
    def derive_shared_secret(
1✔
1520
        self,
1521
        context: RequestContext,
1522
        key_id: KeyIdType,
1523
        key_agreement_algorithm: KeyAgreementAlgorithmSpec,
1524
        public_key: PublicKeyType,
1525
        grant_tokens: GrantTokenList = None,
1526
        dry_run: NullableBooleanType = None,
1527
        recipient: RecipientInfo = None,
1528
        **kwargs,
1529
    ) -> DeriveSharedSecretResponse:
1530
        key = self._get_kms_key(
1✔
1531
            context.account_id,
1532
            context.region,
1533
            key_id,
1534
            enabled_key_allowed=True,
1535
            disabled_key_allowed=True,
1536
        )
1537
        key_usage = key.metadata.get("KeyUsage")
1✔
1538
        key_origin = key.metadata.get("Origin")
1✔
1539

1540
        if key_usage != KeyUsageType.KEY_AGREEMENT:
1✔
1541
            raise InvalidKeyUsageException(
1✔
1542
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1543
            )
1544

1545
        if key_agreement_algorithm != KeyAgreementAlgorithmSpec.ECDH:
1✔
1546
            raise ValidationException(
1✔
1547
                f"1 validation error detected: Value '{key_agreement_algorithm}' at 'keyAgreementAlgorithm' "
1548
                f"failed to satisfy constraint: Member must satisfy enum value set: [ECDH]"
1549
            )
1550

1551
        # TODO: Verify the actual error raised
1552
        if key_origin not in [OriginType.AWS_KMS, OriginType.EXTERNAL]:
1✔
1553
            raise ValueError(f"Key origin: {key_origin} is not valid for {context.operation.name}.")
×
1554

1555
        shared_secret = key.derive_shared_secret(public_key)
1✔
1556
        return DeriveSharedSecretResponse(
1✔
1557
            KeyId=key_id,
1558
            SharedSecret=shared_secret,
1559
            KeyAgreementAlgorithm=key_agreement_algorithm,
1560
            KeyOrigin=key_origin,
1561
        )
1562

1563
    def _validate_key_state_not_pending_import(self, key: KmsKey):
1✔
1564
        if key.metadata["KeyState"] == KeyState.PendingImport:
1✔
1565
            raise KMSInvalidStateException(f"{key.metadata['Arn']} is pending import.")
1✔
1566

1567
    def _validate_external_key_has_pending_material(self, key: KmsKey):
1✔
1568
        if key.metadata["Origin"] == "EXTERNAL" and key.crypto_key.pending_key_material is None:
1✔
1569
            raise KMSInvalidStateException(
1✔
1570
                f"No available key material pending rotation for the key: {key.metadata['Arn']}."
1571
            )
1572

1573
    def _validate_key_for_encryption_decryption(self, context: RequestContext, key: KmsKey):
1✔
1574
        key_usage = key.metadata["KeyUsage"]
1✔
1575
        if key_usage != "ENCRYPT_DECRYPT":
1✔
1576
            raise InvalidKeyUsageException(
1✔
1577
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1578
            )
1579

1580
    def _validate_key_for_sign_verify(self, context: RequestContext, key: KmsKey):
1✔
1581
        key_usage = key.metadata["KeyUsage"]
1✔
1582
        if key_usage != "SIGN_VERIFY":
1✔
1583
            raise InvalidKeyUsageException(
1✔
1584
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1585
            )
1586

1587
    def _validate_key_for_generate_verify_mac(self, context: RequestContext, key: KmsKey):
1✔
1588
        key_usage = key.metadata["KeyUsage"]
1✔
1589
        if key_usage != "GENERATE_VERIFY_MAC":
1✔
1590
            raise InvalidKeyUsageException(
1✔
1591
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1592
            )
1593

1594
    def _validate_mac_msg_length(self, msg: bytes):
1✔
1595
        if len(msg) > 4096:
1✔
1596
            raise ValidationException(
×
1597
                "1 validation error detected: Value at 'message' failed to satisfy constraint: "
1598
                "Member must have length less than or equal to 4096"
1599
            )
1600

1601
    def _validate_mac_algorithm(self, key: KmsKey, algorithm: str):
1✔
1602
        if not hasattr(MacAlgorithmSpec, algorithm):
1✔
1603
            raise ValidationException(
1✔
1604
                f"1 validation error detected: Value '{algorithm}' at 'macAlgorithm' "
1605
                f"failed to satisfy constraint: Member must satisfy enum value set: "
1606
                f"[HMAC_SHA_384, HMAC_SHA_256, HMAC_SHA_224, HMAC_SHA_512]"
1607
            )
1608

1609
        key_spec = key.metadata["KeySpec"]
1✔
1610
        if x := algorithm.split("_"):
1✔
1611
            if len(x) == 3 and x[0] + "_" + x[2] != key_spec:
1✔
1612
                raise InvalidKeyUsageException(
1✔
1613
                    f"Algorithm {algorithm} is incompatible with key spec {key_spec}."
1614
                )
1615

1616
    def _validate_plaintext_length(self, plaintext: bytes):
1✔
1617
        if len(plaintext) > 4096:
1✔
1618
            raise ValidationException(
1✔
1619
                "1 validation error detected: Value at 'plaintext' failed to satisfy constraint: "
1620
                "Member must have length less than or equal to 4096"
1621
            )
1622

1623
    def _validate_grant_request(self, data: dict):
1✔
1624
        if "KeyId" not in data or "GranteePrincipal" not in data or "Operations" not in data:
1✔
1625
            raise ValidationError("Grant ID, key ID and grantee principal must be specified")
×
1626

1627
        for operation in data["Operations"]:
1✔
1628
            if operation not in VALID_OPERATIONS:
1✔
1629
                raise ValidationError(
×
1630
                    f"Value {['Operations']} at 'operations' failed to satisfy constraint: Member must satisfy"
1631
                    f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"
1632
                )
1633

1634
    def _extract_attestation_pubkey(self, attestation_document: bytes) -> RSAPublicKey:
1✔
1635
        # The attestation document comes as a COSE (CBOR Object Signing and Encryption) object: the CBOR
1636
        # attestation is signed and then the attestation and signature are again CBOR-encoded.  For now
1637
        # we don't bother validating the signature, though in the future we could.
1638
        cose_document = cbor2_loads(attestation_document)
1✔
1639
        attestation = cbor2_loads(cose_document[2])
1✔
1640
        public_key_bytes = attestation["public_key"]
1✔
1641
        return load_der_public_key(public_key_bytes)
1✔
1642

1643
    def _decrypt_wrapped_key_material(
1✔
1644
        self,
1645
        import_state: KeyImportState,
1646
        encrypted_key_material: CiphertextType,
1647
    ) -> bytes:
1648
        algo = import_state.wrapping_algo
1✔
1649
        decrypt_key = import_state.key.crypto_key.key
1✔
1650

1651
        match algo:
1✔
1652
            case AlgorithmSpec.RSAES_PKCS1_V1_5:
1✔
1653
                padding_scheme = padding.PKCS1v15()
×
1654
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
×
1655
            case AlgorithmSpec.RSAES_OAEP_SHA_1:
1✔
1656
                padding_scheme = padding.OAEP(padding.MGF1(hashes.SHA1()), hashes.SHA1(), None)
1✔
1657
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
1✔
1658
            case AlgorithmSpec.RSAES_OAEP_SHA_256:
1✔
1659
                padding_scheme = padding.OAEP(padding.MGF1(hashes.SHA256()), hashes.SHA256(), None)
1✔
1660
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
1✔
1661
            case AlgorithmSpec.RSA_AES_KEY_WRAP_SHA_256:
1✔
1662
                rsa_key_size_bytes = decrypt_key.key_size // 8
1✔
1663
                wrapped_aes_key = encrypted_key_material[:rsa_key_size_bytes]
1✔
1664
                wrapped_key_material = encrypted_key_material[rsa_key_size_bytes:]
1✔
1665

1666
                aes_key = decrypt_key.decrypt(
1✔
1667
                    wrapped_aes_key,
1668
                    padding.OAEP(
1669
                        mgf=padding.MGF1(algorithm=hashes.SHA256()),
1670
                        algorithm=hashes.SHA256(),
1671
                        label=None,
1672
                    ),
1673
                )
1674

1675
                return keywrap.aes_key_unwrap_with_padding(
1✔
1676
                    aes_key, wrapped_key_material, default_backend()
1677
                )
1678

1679
            case _:
×
1680
                raise KMSInvalidStateException(
×
1681
                    f"Unsupported padding, requested wrapping algorithm: '{algo}'"
1682
                )
1683

1684
    def _validate_plaintext_key_type_based(
1✔
1685
        self,
1686
        plaintext: PlaintextType,
1687
        key: KmsKey,
1688
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1689
    ):
1690
        # max size values extracted from AWS boto3 documentation
1691
        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kms/client/encrypt.html
1692
        max_size_bytes = 4096  # max allowed size
1✔
1693
        if (
1✔
1694
            key.metadata["KeySpec"] == KeySpec.RSA_2048
1695
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1696
        ):
1697
            max_size_bytes = 214
1✔
1698
        elif (
1✔
1699
            key.metadata["KeySpec"] == KeySpec.RSA_2048
1700
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1701
        ):
1702
            max_size_bytes = 190
1✔
1703
        elif (
1✔
1704
            key.metadata["KeySpec"] == KeySpec.RSA_3072
1705
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1706
        ):
1707
            max_size_bytes = 342
1✔
1708
        elif (
1✔
1709
            key.metadata["KeySpec"] == KeySpec.RSA_3072
1710
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1711
        ):
1712
            max_size_bytes = 318
1✔
1713
        elif (
1✔
1714
            key.metadata["KeySpec"] == KeySpec.RSA_4096
1715
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1716
        ):
1717
            max_size_bytes = 470
1✔
1718
        elif (
1✔
1719
            key.metadata["KeySpec"] == KeySpec.RSA_4096
1720
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1721
        ):
1722
            max_size_bytes = 446
1✔
1723

1724
        if len(plaintext) > max_size_bytes:
1✔
1725
            raise ValidationException(
1✔
1726
                f"Algorithm {encryption_algorithm} and key spec {key.metadata['KeySpec']} cannot encrypt data larger than {max_size_bytes} bytes."
1727
            )
1728

1729

1730
# ---------------
1731
# UTIL FUNCTIONS
1732
# ---------------
1733

1734
# Different AWS services have some internal integrations with KMS. Some create keys, that are used to encrypt/decrypt
1735
# customer's data. Such keys can't be created from outside for security reasons. So AWS services use some internal
1736
# APIs to do that. Functions here are supposed to be used by other LocalStack services to have similar integrations
1737
# with KMS in LocalStack. As such, they are supposed to be proper APIs (as in error and security handling),
1738
# just with more features.
1739

1740

1741
def set_key_managed(key_id: str, account_id: str, region_name: str) -> None:
1✔
1742
    key = KmsProvider._get_kms_key(account_id, region_name, key_id)
1✔
1743
    key.metadata["KeyManager"] = "AWS"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc