• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.69
/localstack-core/localstack/services/kms/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import logging
1✔
5
import os
1✔
6

7
from cbor2 import loads as cbor2_loads
1✔
8
from cryptography.exceptions import InvalidTag
1✔
9
from cryptography.hazmat.backends import default_backend
1✔
10
from cryptography.hazmat.primitives import hashes, keywrap
1✔
11
from cryptography.hazmat.primitives.asymmetric import padding
1✔
12
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
1✔
13
from cryptography.hazmat.primitives.serialization import load_der_public_key
1✔
14

15
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
16
from localstack.aws.api.kms import (
1✔
17
    AlgorithmSpec,
18
    AlreadyExistsException,
19
    BackingKeyIdType,
20
    CancelKeyDeletionRequest,
21
    CancelKeyDeletionResponse,
22
    CiphertextType,
23
    CreateAliasRequest,
24
    CreateGrantRequest,
25
    CreateGrantResponse,
26
    CreateKeyRequest,
27
    CreateKeyResponse,
28
    DataKeyPairSpec,
29
    DateType,
30
    DecryptResponse,
31
    DeleteAliasRequest,
32
    DeleteImportedKeyMaterialResponse,
33
    DeriveSharedSecretResponse,
34
    DescribeKeyRequest,
35
    DescribeKeyResponse,
36
    DisabledException,
37
    DisableKeyRequest,
38
    DisableKeyRotationRequest,
39
    DryRunModifierList,
40
    EnableKeyRequest,
41
    EnableKeyRotationRequest,
42
    EncryptionAlgorithmSpec,
43
    EncryptionContextType,
44
    EncryptResponse,
45
    ExpirationModelType,
46
    GenerateDataKeyPairResponse,
47
    GenerateDataKeyPairWithoutPlaintextResponse,
48
    GenerateDataKeyRequest,
49
    GenerateDataKeyResponse,
50
    GenerateDataKeyWithoutPlaintextRequest,
51
    GenerateDataKeyWithoutPlaintextResponse,
52
    GenerateMacRequest,
53
    GenerateMacResponse,
54
    GenerateRandomRequest,
55
    GenerateRandomResponse,
56
    GetKeyPolicyRequest,
57
    GetKeyPolicyResponse,
58
    GetKeyRotationStatusRequest,
59
    GetKeyRotationStatusResponse,
60
    GetParametersForImportResponse,
61
    GetPublicKeyResponse,
62
    GrantIdType,
63
    GrantTokenList,
64
    GrantTokenType,
65
    ImportKeyMaterialResponse,
66
    ImportType,
67
    IncorrectKeyException,
68
    InvalidCiphertextException,
69
    InvalidGrantIdException,
70
    InvalidKeyUsageException,
71
    KeyAgreementAlgorithmSpec,
72
    KeyIdType,
73
    KeyMaterialDescriptionType,
74
    KeySpec,
75
    KeyState,
76
    KeyUsageType,
77
    KmsApi,
78
    KMSInvalidStateException,
79
    LimitType,
80
    ListAliasesResponse,
81
    ListGrantsRequest,
82
    ListGrantsResponse,
83
    ListKeyPoliciesRequest,
84
    ListKeyPoliciesResponse,
85
    ListKeysRequest,
86
    ListKeysResponse,
87
    ListResourceTagsRequest,
88
    ListResourceTagsResponse,
89
    MacAlgorithmSpec,
90
    MarkerType,
91
    MultiRegionKey,
92
    MultiRegionKeyType,
93
    NotFoundException,
94
    NullableBooleanType,
95
    OriginType,
96
    PlaintextType,
97
    PrincipalIdType,
98
    PublicKeyType,
99
    PutKeyPolicyRequest,
100
    RecipientInfo,
101
    ReEncryptResponse,
102
    ReplicateKeyRequest,
103
    ReplicateKeyResponse,
104
    RotateKeyOnDemandRequest,
105
    RotateKeyOnDemandResponse,
106
    ScheduleKeyDeletionRequest,
107
    ScheduleKeyDeletionResponse,
108
    SignRequest,
109
    SignResponse,
110
    Tag,
111
    TagKeyList,
112
    TagList,
113
    TagResourceRequest,
114
    UnsupportedOperationException,
115
    UntagResourceRequest,
116
    UpdateAliasRequest,
117
    UpdateKeyDescriptionRequest,
118
    VerifyMacRequest,
119
    VerifyMacResponse,
120
    VerifyRequest,
121
    VerifyResponse,
122
    WrappingKeySpec,
123
)
124
from localstack.services.kms.exceptions import ValidationException
1✔
125
from localstack.services.kms.models import (
1✔
126
    MULTI_REGION_PATTERN,
127
    PATTERN_UUID,
128
    RESERVED_ALIASES,
129
    KeyImportState,
130
    KmsAlias,
131
    KmsCryptoKey,
132
    KmsGrant,
133
    KmsKey,
134
    KmsStore,
135
    deserialize_ciphertext_blob,
136
    kms_stores,
137
)
138
from localstack.services.kms.utils import (
1✔
139
    execute_dry_run_capable,
140
    get_custom_key_id,
141
    get_custom_key_material,
142
    is_valid_key_arn,
143
    parse_key_arn,
144
    validate_alias_name,
145
    validate_and_filter_tags,
146
)
147
from localstack.services.plugins import ServiceLifecycleHook
1✔
148
from localstack.state import StateVisitor
1✔
149
from localstack.utils.aws.arns import get_partition, kms_alias_arn, parse_arn
1✔
150
from localstack.utils.collections import PaginatedList
1✔
151
from localstack.utils.common import select_attributes
1✔
152
from localstack.utils.crypto import pkcs7_envelope_encrypt
1✔
153
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
154

155
LOG = logging.getLogger(__name__)
1✔
156

157
# valid operations
158
VALID_OPERATIONS = [
1✔
159
    "CreateKey",
160
    "Decrypt",
161
    "Encrypt",
162
    "GenerateDataKey",
163
    "GenerateDataKeyWithoutPlaintext",
164
    "ReEncryptFrom",
165
    "ReEncryptTo",
166
    "Sign",
167
    "Verify",
168
    "GetPublicKey",
169
    "CreateGrant",
170
    "RetireGrant",
171
    "DescribeKey",
172
    "GenerateDataKeyPair",
173
    "GenerateDataKeyPairWithoutPlaintext",
174
]
175

176

177
class ValidationError(CommonServiceException):
1✔
178
    """General validation error type (defined in the AWS docs, but not part of the botocore spec)"""
179

180
    def __init__(self, message=None):
1✔
181
        super().__init__("ValidationError", message=message)
×
182

183

184
# For all operations constraints for states of keys are based on
185
# https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
186
class KmsProvider(KmsApi, ServiceLifecycleHook):
1✔
187
    """
188
    The LocalStack Key Management Service (KMS) provider.
189

190
    Cross-account access is supported by following operations where key ID belonging
191
    to another account can be used with the key ARN.
192
    - CreateGrant
193
    - DescribeKey
194
    - GetKeyRotationStatus
195
    - GetPublicKey
196
    - ListGrants
197
    - RetireGrant
198
    - RevokeGrant
199
    - Decrypt
200
    - Encrypt
201
    - GenerateDataKey
202
    - GenerateDataKeyPair
203
    - GenerateDataKeyPairWithoutPlaintext
204
    - GenerateDataKeyWithoutPlaintext
205
    - GenerateMac
206
    - ReEncrypt
207
    - Sign
208
    - Verify
209
    - VerifyMac
210
    """
211

212
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
213
        visitor.visit(kms_stores)
×
214

215
    #
216
    # Helpers
217
    #
218

219
    @staticmethod
1✔
220
    def _get_store(account_id: str, region_name: str) -> KmsStore:
1✔
221
        return kms_stores[account_id][region_name]
1✔
222

223
    @staticmethod
1✔
224
    def _create_kms_alias(account_id: str, region_name: str, request: CreateAliasRequest):
1✔
225
        store = kms_stores[account_id][region_name]
1✔
226
        alias = KmsAlias(request, account_id, region_name)
1✔
227
        alias_name = request.get("AliasName")
1✔
228
        store.aliases[alias_name] = alias
1✔
229

230
    @staticmethod
1✔
231
    def _create_kms_key(
1✔
232
        account_id: str, region_name: str, request: CreateKeyRequest = None
233
    ) -> KmsKey:
234
        store = kms_stores[account_id][region_name]
1✔
235
        key = KmsKey(
1✔
236
            request,
237
            account_id,
238
            region_name,
239
            get_custom_key_material(request),
240
            get_custom_key_id(request),
241
        )
242
        key_id = key.metadata["KeyId"]
1✔
243
        store.keys[key_id] = key
1✔
244
        return key
1✔
245

246
    @staticmethod
1✔
247
    def _get_key_id_from_any_id(account_id: str, region_name: str, some_id: str) -> str:
1✔
248
        """
249
        Resolve a KMS key ID by using one of the following identifiers:
250
        - key ID
251
        - key ARN
252
        - key alias
253
        - key alias ARN
254
        """
255
        alias_name = None
1✔
256
        key_id = None
1✔
257
        key_arn = None
1✔
258

259
        if some_id.startswith("arn:"):
1✔
260
            if ":alias/" in some_id:
1✔
261
                alias_arn = some_id
×
262
                alias_name = "alias/" + alias_arn.split(":alias/")[1]
×
263
            elif ":key/" in some_id:
1✔
264
                key_arn = some_id
1✔
265
                key_id = key_arn.split(":key/")[1]
1✔
266
                parsed_arn = parse_arn(key_arn)
1✔
267
                if parsed_arn["region"] != region_name:
1✔
268
                    raise NotFoundException(f"Invalid arn {parsed_arn['region']}")
×
269
            else:
270
                raise ValueError(
×
271
                    f"Supplied value of {some_id} is an ARN, but neither of a KMS key nor of a KMS key "
272
                    f"alias"
273
                )
274
        elif some_id.startswith("alias/"):
1✔
275
            alias_name = some_id
1✔
276
        else:
277
            key_id = some_id
1✔
278

279
        store = kms_stores[account_id][region_name]
1✔
280

281
        if alias_name:
1✔
282
            KmsProvider._create_alias_if_reserved_and_not_exists(
1✔
283
                account_id,
284
                region_name,
285
                alias_name,
286
            )
287
            if alias_name not in store.aliases:
1✔
288
                raise NotFoundException(f"Unable to find KMS alias with name {alias_name}")
×
289
            key_id = store.aliases[alias_name].metadata["TargetKeyId"]
1✔
290

291
        # regular KeyId are UUID, and MultiRegion keys starts with 'mrk-' and 32 hex chars
292
        if not PATTERN_UUID.match(key_id) and not MULTI_REGION_PATTERN.match(key_id):
1✔
293
            raise NotFoundException(f"Invalid keyId '{key_id}'")
1✔
294

295
        if key_id not in store.keys:
1✔
296
            if not key_arn:
1✔
297
                key_arn = (
1✔
298
                    f"arn:{get_partition(region_name)}:kms:{region_name}:{account_id}:key/{key_id}"
299
                )
300
            raise NotFoundException(f"Key '{key_arn}' does not exist")
1✔
301

302
        return key_id
1✔
303

304
    @staticmethod
1✔
305
    def _create_alias_if_reserved_and_not_exists(
1✔
306
        account_id: str, region_name: str, alias_name: str
307
    ):
308
        store = kms_stores[account_id][region_name]
1✔
309
        if alias_name not in RESERVED_ALIASES or alias_name in store.aliases:
1✔
310
            return
1✔
311
        key_id = KmsProvider._create_kms_key(
×
312
            account_id,
313
            region_name,
314
        ).metadata.get("KeyId")
315
        create_alias_request = CreateAliasRequest(AliasName=alias_name, TargetKeyId=key_id)
×
316
        KmsProvider._create_kms_alias(account_id, region_name, create_alias_request)
×
317

318
    # While in AWS keys have more than Enabled, Disabled and PendingDeletion states, we currently only model these 3
319
    # in LocalStack, so this function is limited to them.
320
    #
321
    # The current default values are based on most of the operations working in AWS with enabled keys, but failing with
322
    # disabled and those pending deletion.
323
    #
324
    # If we decide to use the other states as well, we might want to come up with a better key state validation per
325
    # operation. Can consult this page for what states are supported by various operations:
326
    # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
327
    @staticmethod
1✔
328
    def _get_kms_key(
1✔
329
        account_id: str,
330
        region_name: str,
331
        any_type_of_key_id: str,
332
        any_key_state_allowed: bool = False,
333
        enabled_key_allowed: bool = True,
334
        disabled_key_allowed: bool = False,
335
        pending_deletion_key_allowed: bool = False,
336
    ) -> KmsKey:
337
        store = kms_stores[account_id][region_name]
1✔
338

339
        if any_key_state_allowed:
1✔
340
            enabled_key_allowed = True
1✔
341
            disabled_key_allowed = True
1✔
342
            pending_deletion_key_allowed = True
1✔
343
        if not (enabled_key_allowed or disabled_key_allowed or pending_deletion_key_allowed):
1✔
344
            raise ValueError("A key is requested, but all possible key states are prohibited")
×
345

346
        key_id = KmsProvider._get_key_id_from_any_id(account_id, region_name, any_type_of_key_id)
1✔
347
        key = store.keys[key_id]
1✔
348

349
        if not disabled_key_allowed and key.metadata.get("KeyState") == "Disabled":
1✔
350
            raise DisabledException(f"{key.metadata.get('Arn')} is disabled.")
1✔
351
        if not pending_deletion_key_allowed and key.metadata.get("KeyState") == "PendingDeletion":
1✔
352
            raise KMSInvalidStateException(f"{key.metadata.get('Arn')} is pending deletion.")
1✔
353
        if not enabled_key_allowed and key.metadata.get("KeyState") == "Enabled":
1✔
354
            raise KMSInvalidStateException(
×
355
                f"{key.metadata.get('Arn')} is enabled, but the operation doesn't support "
356
                f"such a state"
357
            )
358
        return store.keys[key_id]
1✔
359

360
    @staticmethod
1✔
361
    def _get_kms_alias(account_id: str, region_name: str, alias_name_or_arn: str) -> KmsAlias:
1✔
362
        store = kms_stores[account_id][region_name]
1✔
363

364
        if not alias_name_or_arn.startswith("arn:"):
1✔
365
            alias_name = alias_name_or_arn
1✔
366
        else:
367
            if ":alias/" not in alias_name_or_arn:
×
368
                raise ValidationException(f"{alias_name_or_arn} is not a valid alias ARN")
×
369
            alias_name = "alias/" + alias_name_or_arn.split(":alias/")[1]
×
370

371
        validate_alias_name(alias_name)
1✔
372

373
        if alias_name not in store.aliases:
1✔
374
            alias_arn = kms_alias_arn(alias_name, account_id, region_name)
×
375
            # AWS itself uses AliasArn instead of AliasName in this exception.
376
            raise NotFoundException(f"Alias {alias_arn} is not found.")
×
377

378
        return store.aliases.get(alias_name)
1✔
379

380
    @staticmethod
1✔
381
    def _parse_key_id(key_id_or_arn: str, context: RequestContext) -> tuple[str, str, str]:
1✔
382
        """
383
        Return locator attributes (account ID, region_name, key ID) of a given KMS key.
384

385
        If an ARN is provided, this is extracted from it. Otherwise, context data is used.
386

387
        :param key_id_or_arn: KMS key ID or ARN
388
        :param context: request context
389
        :return: Tuple of account ID, region name and key ID
390
        """
391
        if is_valid_key_arn(key_id_or_arn):
1✔
392
            account_id, region_name, key_id = parse_key_arn(key_id_or_arn)
1✔
393
            if region_name != context.region:
1✔
394
                raise NotFoundException(f"Invalid arn {region_name}")
1✔
395
            return account_id, region_name, key_id
1✔
396

397
        return context.account_id, context.region, key_id_or_arn
1✔
398

399
    @staticmethod
1✔
400
    def _is_rsa_spec(key_spec: str) -> bool:
1✔
401
        return key_spec in [KeySpec.RSA_2048, KeySpec.RSA_3072, KeySpec.RSA_4096]
1✔
402

403
    def _get_key_tags(self, account_id: str, region: str, resource_arn: str) -> TagList:
1✔
404
        store = self._get_store(account_id, region)
1✔
405
        return [
1✔
406
            Tag(TagKey=key, TagValue=value)
407
            for key, value in store.tags.get_tags(resource_arn).items()
408
        ]
409

410
    def _set_key_tags(self, account_id: str, region: str, resource_arn: str, tags: TagList) -> None:
1✔
411
        validated_tags = validate_and_filter_tags(tags)
1✔
412
        store = self._get_store(account_id, region)
1✔
413
        store.tags.update_tags(
1✔
414
            resource_arn, {tag["TagKey"]: tag["TagValue"] for tag in validated_tags}
415
        )
416

417
    def _remove_key_tags(
1✔
418
        self, account_id: str, region: str, resource_arn: str, tag_keys: TagKeyList
419
    ) -> None:
420
        store = self._get_store(account_id, region)
1✔
421
        store.tags.delete_tags(resource_arn, tag_keys)
1✔
422

423
    #
424
    # Operation Handlers
425
    #
426

427
    @handler("CreateKey", expand=False)
1✔
428
    def create_key(
1✔
429
        self,
430
        context: RequestContext,
431
        request: CreateKeyRequest = None,
432
    ) -> CreateKeyResponse:
433
        tags = validate_and_filter_tags(request.get("Tags", []))
1✔
434
        key = self._create_kms_key(context.account_id, context.region, request)
1✔
435
        if tags:
1✔
436
            self._set_key_tags(context.account_id, context.region, key.metadata["Arn"], tags)
1✔
437
        return CreateKeyResponse(KeyMetadata=key.metadata)
1✔
438

439
    @handler("ScheduleKeyDeletion", expand=False)
1✔
440
    def schedule_key_deletion(
1✔
441
        self, context: RequestContext, request: ScheduleKeyDeletionRequest
442
    ) -> ScheduleKeyDeletionResponse:
443
        pending_window = int(request.get("PendingWindowInDays", 30))
1✔
444
        if pending_window < 7 or pending_window > 30:
1✔
445
            raise ValidationException(
×
446
                f"PendingWindowInDays should be between 7 and 30, but it is {pending_window}"
447
            )
448
        key = self._get_kms_key(
1✔
449
            context.account_id,
450
            context.region,
451
            request.get("KeyId"),
452
            enabled_key_allowed=True,
453
            disabled_key_allowed=True,
454
        )
455
        key.schedule_key_deletion(pending_window)
1✔
456
        attrs = ["DeletionDate", "KeyId", "KeyState"]
1✔
457
        result = select_attributes(key.metadata, attrs)
1✔
458
        result["PendingWindowInDays"] = pending_window
1✔
459
        return ScheduleKeyDeletionResponse(**result)
1✔
460

461
    @handler("CancelKeyDeletion", expand=False)
1✔
462
    def cancel_key_deletion(
1✔
463
        self, context: RequestContext, request: CancelKeyDeletionRequest
464
    ) -> CancelKeyDeletionResponse:
465
        key = self._get_kms_key(
1✔
466
            context.account_id,
467
            context.region,
468
            request.get("KeyId"),
469
            enabled_key_allowed=False,
470
            pending_deletion_key_allowed=True,
471
        )
472
        key.metadata["KeyState"] = KeyState.Disabled
1✔
473
        key.metadata["DeletionDate"] = None
1✔
474
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CancelKeyDeletion.html#API_CancelKeyDeletion_ResponseElements
475
        # "The Amazon Resource Name (key ARN) of the KMS key whose deletion is canceled."
476
        return CancelKeyDeletionResponse(KeyId=key.metadata.get("Arn"))
1✔
477

478
    @handler("DisableKey", expand=False)
1✔
479
    def disable_key(self, context: RequestContext, request: DisableKeyRequest) -> None:
1✔
480
        # Technically, AWS allows DisableKey for keys that are already disabled.
481
        key = self._get_kms_key(
1✔
482
            context.account_id,
483
            context.region,
484
            request.get("KeyId"),
485
            enabled_key_allowed=True,
486
            disabled_key_allowed=True,
487
        )
488
        key.metadata["KeyState"] = KeyState.Disabled
1✔
489
        key.metadata["Enabled"] = False
1✔
490

491
    @handler("EnableKey", expand=False)
1✔
492
    def enable_key(self, context: RequestContext, request: EnableKeyRequest) -> None:
1✔
493
        key = self._get_kms_key(
1✔
494
            context.account_id,
495
            context.region,
496
            request.get("KeyId"),
497
            enabled_key_allowed=True,
498
            disabled_key_allowed=True,
499
        )
500
        key.metadata["KeyState"] = KeyState.Enabled
1✔
501
        key.metadata["Enabled"] = True
1✔
502

503
    @handler("ListKeys", expand=False)
1✔
504
    def list_keys(self, context: RequestContext, request: ListKeysRequest) -> ListKeysResponse:
1✔
505
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html#API_ListKeys_ResponseSyntax
506
        # Out of whole KeyMetadata only two fields are present in the response.
507
        keys_list = PaginatedList(
1✔
508
            [
509
                {"KeyId": key.metadata["KeyId"], "KeyArn": key.metadata["Arn"]}
510
                for key in self._get_store(context.account_id, context.region).keys.values()
511
            ]
512
        )
513
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeys.html#API_ListKeys_RequestParameters
514
        # Regarding the default value of Limit: "If you do not include a value, it defaults to 100."
515
        page, next_token = keys_list.get_page(
1✔
516
            lambda key_data: key_data.get("KeyId"),
517
            next_token=request.get("Marker"),
518
            page_size=request.get("Limit", 100),
519
        )
520
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
521
        return ListKeysResponse(Keys=page, **kwargs)
1✔
522

523
    @handler("DescribeKey", expand=False)
1✔
524
    def describe_key(
1✔
525
        self, context: RequestContext, request: DescribeKeyRequest
526
    ) -> DescribeKeyResponse:
527
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
528
        key = self._get_kms_key(account_id, region_name, key_id, any_key_state_allowed=True)
1✔
529
        return DescribeKeyResponse(KeyMetadata=key.metadata)
1✔
530

531
    @handler("ReplicateKey", expand=False)
1✔
532
    def replicate_key(
1✔
533
        self, context: RequestContext, request: ReplicateKeyRequest
534
    ) -> ReplicateKeyResponse:
535
        account_id = context.account_id
1✔
536
        primary_key = self._get_kms_key(account_id, context.region, request.get("KeyId"))
1✔
537
        key_id = primary_key.metadata.get("KeyId")
1✔
538
        key_arn = primary_key.metadata.get("Arn")
1✔
539
        if not primary_key.metadata.get("MultiRegion"):
1✔
540
            raise UnsupportedOperationException(
×
541
                f"Unable to replicate a non-MultiRegion key {key_id}"
542
            )
543
        replica_region = request.get("ReplicaRegion")
1✔
544
        replicate_to_store = kms_stores[account_id][replica_region]
1✔
545

546
        if (
1✔
547
            primary_key.metadata.get("MultiRegionConfiguration", {}).get("MultiRegionKeyType")
548
            != MultiRegionKeyType.PRIMARY
549
        ):
550
            raise UnsupportedOperationException(f"{key_arn} is not a multi-region primary key.")
1✔
551

552
        if key_id in replicate_to_store.keys:
1✔
553
            raise AlreadyExistsException(
×
554
                f"Unable to replicate key {key_id} to region {replica_region}, as the key "
555
                f"already exist there"
556
            )
557
        replica_key = copy.deepcopy(primary_key)
1✔
558
        replica_key.replicate_metadata(request, account_id, replica_region)
1✔
559
        replicate_to_store.keys[key_id] = replica_key
1✔
560

561
        self.update_primary_key_with_replica_keys(primary_key, replica_key, replica_region)
1✔
562

563
        # CurrentKeyMaterialId is not returned in the ReplicaKeyMetadata. May be due to not being evaluated until
564
        # the key has been successfully replicated as it does not show up in DescribeKey immediately either.
565
        replica_key_metadata_response = copy.deepcopy(replica_key.metadata)
1✔
566
        replica_key_metadata_response.pop("CurrentKeyMaterialId", None)
1✔
567

568
        return ReplicateKeyResponse(ReplicaKeyMetadata=replica_key_metadata_response)
1✔
569

570
    @staticmethod
1✔
571
    # Adds new multi region replica key to the primary key's metadata.
572
    def update_primary_key_with_replica_keys(key: KmsKey, replica_key: KmsKey, region: str):
1✔
573
        key.metadata["MultiRegionConfiguration"]["ReplicaKeys"].append(
1✔
574
            MultiRegionKey(
575
                Arn=replica_key.metadata["Arn"],
576
                Region=region,
577
            )
578
        )
579

580
    @handler("UpdateKeyDescription", expand=False)
1✔
581
    def update_key_description(
1✔
582
        self, context: RequestContext, request: UpdateKeyDescriptionRequest
583
    ) -> None:
584
        key = self._get_kms_key(
1✔
585
            context.account_id,
586
            context.region,
587
            request.get("KeyId"),
588
            enabled_key_allowed=True,
589
            disabled_key_allowed=True,
590
        )
591
        key.metadata["Description"] = request.get("Description")
1✔
592

593
    @handler("CreateGrant", expand=False)
1✔
594
    def create_grant(
1✔
595
        self, context: RequestContext, request: CreateGrantRequest
596
    ) -> CreateGrantResponse:
597
        key_account_id, key_region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
598
        key = self._get_kms_key(key_account_id, key_region_name, key_id)
1✔
599

600
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
601
        # matter which type of id is used.
602
        key_id = key.metadata.get("KeyId")
1✔
603
        request["KeyId"] = key_id
1✔
604
        self._validate_grant_request(request)
1✔
605
        grant_name = request.get("Name")
1✔
606

607
        store = self._get_store(context.account_id, context.region)
1✔
608

609
        # Check for existing grant with the same name for idempotency
610
        grant: KmsGrant | None = None
1✔
611
        if grant_name:
1✔
612
            for existing_grant in store.grants.values():
1✔
613
                if existing_grant.metadata.get("Name") != grant_name:
1✔
614
                    continue
1✔
615
                if existing_grant.metadata.get("KeyId") == key_id:
1✔
616
                    grant = existing_grant
×
617
                    break
×
618

619
        if grant is None:
1✔
620
            grant = KmsGrant(request, context.account_id, context.region)
1✔
621
            grant_id = grant.metadata["GrantId"]
1✔
622
            store.grants[grant_id] = grant
1✔
623
            store.grant_tokens[grant.token] = grant_id
1✔
624

625
        # At the moment we do not support multiple GrantTokens for grant creation request. Instead, we always use
626
        # the same token. For the reference, AWS documentation says:
627
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateGrant.html#API_CreateGrant_RequestParameters
628
        # "The returned grant token is unique with every CreateGrant request, even when a duplicate GrantId is
629
        # returned". "A duplicate GrantId" refers to the idempotency of grant creation requests - if a request has
630
        # "Name" field, and if such name already belongs to a previously created grant, no new grant gets created
631
        # and the existing grant with the name is returned.
632
        return CreateGrantResponse(GrantId=grant.metadata["GrantId"], GrantToken=grant.token)
1✔
633

634
    @handler("ListGrants", expand=False)
1✔
635
    def list_grants(
1✔
636
        self, context: RequestContext, request: ListGrantsRequest
637
    ) -> ListGrantsResponse:
638
        if not request.get("KeyId"):
1✔
639
            raise ValidationError("Required input parameter KeyId not specified")
×
640
        key_account_id, key_region_name, _ = self._parse_key_id(request["KeyId"], context)
1✔
641
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
642
        # matter which type of id is used.
643
        key = self._get_kms_key(
1✔
644
            key_account_id, key_region_name, request.get("KeyId"), any_key_state_allowed=True
645
        )
646
        key_id = key.metadata.get("KeyId")
1✔
647

648
        store = self._get_store(context.account_id, context.region)
1✔
649
        grant_id = request.get("GrantId")
1✔
650
        if grant_id:
1✔
651
            if grant_id not in store.grants:
×
652
                raise InvalidGrantIdException()
×
653
            return ListGrantsResponse(Grants=[store.grants[grant_id].metadata])
×
654

655
        matching_grants = []
1✔
656
        grantee_principal = request.get("GranteePrincipal")
1✔
657
        for grant in store.grants.values():
1✔
658
            # KeyId is a mandatory field of ListGrants request, so is going to be present.
659
            _, _, grant_key_id = parse_key_arn(grant.metadata["KeyArn"])
1✔
660
            if grant_key_id != key_id:
1✔
661
                continue
1✔
662
            # GranteePrincipal is a mandatory field for CreateGrant, should be in grants. But it is an optional field
663
            # for ListGrants, so might not be there.
664
            if grantee_principal and grant.metadata["GranteePrincipal"] != grantee_principal:
1✔
665
                continue
×
666
            matching_grants.append(grant.metadata)
1✔
667

668
        grants_list = PaginatedList(matching_grants)
1✔
669
        page, next_token = grants_list.get_page(
1✔
670
            lambda grant_data: grant_data.get("GrantId"),
671
            next_token=request.get("Marker"),
672
            page_size=request.get("Limit", 50),
673
        )
674
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
675

676
        return ListGrantsResponse(Grants=page, **kwargs)
1✔
677

678
    @staticmethod
1✔
679
    def _delete_grant(store: KmsStore, grant_id: str, key_id: str):
1✔
680
        grant = store.grants[grant_id]
1✔
681

682
        _, _, grant_key_id = parse_key_arn(grant.metadata.get("KeyArn"))
1✔
683
        if key_id != grant_key_id:
1✔
684
            raise ValidationError(f"Invalid KeyId={key_id} specified for grant {grant_id}")
×
685

686
        store.grant_tokens.pop(grant.token)
1✔
687
        store.grants.pop(grant_id)
1✔
688

689
    def revoke_grant(
1✔
690
        self,
691
        context: RequestContext,
692
        key_id: KeyIdType,
693
        grant_id: GrantIdType,
694
        dry_run: NullableBooleanType = None,
695
        **kwargs,
696
    ) -> None:
697
        # TODO add support for "dry_run"
698
        key_account_id, key_region_name, key_id = self._parse_key_id(key_id, context)
1✔
699
        key = self._get_kms_key(key_account_id, key_region_name, key_id, any_key_state_allowed=True)
1✔
700
        key_id = key.metadata.get("KeyId")
1✔
701

702
        store = self._get_store(context.account_id, context.region)
1✔
703

704
        if grant_id not in store.grants:
1✔
705
            raise InvalidGrantIdException()
×
706

707
        self._delete_grant(store, grant_id, key_id)
1✔
708

709
    def retire_grant(
1✔
710
        self,
711
        context: RequestContext,
712
        grant_token: GrantTokenType = None,
713
        key_id: KeyIdType = None,
714
        grant_id: GrantIdType = None,
715
        dry_run: NullableBooleanType = None,
716
        **kwargs,
717
    ) -> None:
718
        # TODO add support for "dry_run"
719
        if not grant_token and (not grant_id or not key_id):
1✔
720
            raise ValidationException("Grant token OR (grant ID, key ID) must be specified")
×
721

722
        if grant_token:
1✔
723
            decoded_token = to_str(base64.b64decode(grant_token))
1✔
724
            grant_account_id, grant_region_name, _ = decoded_token.split(":")
1✔
725
            grant_store = self._get_store(grant_account_id, grant_region_name)
1✔
726

727
            if grant_token not in grant_store.grant_tokens:
1✔
728
                raise NotFoundException(f"Unable to find grant token {grant_token}")
×
729

730
            grant_id = grant_store.grant_tokens[grant_token]
1✔
731
        else:
732
            grant_store = self._get_store(context.account_id, context.region)
1✔
733

734
        if key_id:
1✔
735
            key_account_id, key_region_name, key_id = self._parse_key_id(key_id, context)
1✔
736
            key = self._get_kms_key(
1✔
737
                key_account_id, key_region_name, key_id, any_key_state_allowed=True
738
            )
739
            key_id = key.metadata.get("KeyId")
1✔
740
        else:
741
            _, _, key_id = parse_key_arn(grant_store.grants[grant_id].metadata.get("KeyArn"))
1✔
742

743
        self._delete_grant(grant_store, grant_id, key_id)
1✔
744

745
    def list_retirable_grants(
1✔
746
        self,
747
        context: RequestContext,
748
        retiring_principal: PrincipalIdType,
749
        limit: LimitType = None,
750
        marker: MarkerType = None,
751
        **kwargs,
752
    ) -> ListGrantsResponse:
753
        if not retiring_principal:
1✔
754
            raise ValidationError("Required input parameter 'RetiringPrincipal' not specified")
×
755

756
        matching_grants = [
1✔
757
            grant.metadata
758
            for grant in self._get_store(context.account_id, context.region).grants.values()
759
            if grant.metadata.get("RetiringPrincipal") == retiring_principal
760
        ]
761
        grants_list = PaginatedList(matching_grants)
1✔
762
        limit = limit or 50
1✔
763
        page, next_token = grants_list.get_page(
1✔
764
            lambda grant_data: grant_data.get("GrantId"),
765
            next_token=marker,
766
            page_size=limit,
767
        )
768
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
769

770
        return ListGrantsResponse(Grants=page, **kwargs)
1✔
771

772
    def get_public_key(
1✔
773
        self,
774
        context: RequestContext,
775
        key_id: KeyIdType,
776
        grant_tokens: GrantTokenList = None,
777
        **kwargs,
778
    ) -> GetPublicKeyResponse:
779
        # According to https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html, GetPublicKey is supposed
780
        # to fail for disabled keys. But it actually doesn't fail in AWS.
781
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
782
        key = self._get_kms_key(
1✔
783
            account_id,
784
            region_name,
785
            key_id,
786
            enabled_key_allowed=True,
787
            disabled_key_allowed=True,
788
        )
789
        attrs = [
1✔
790
            "KeySpec",
791
            "KeyUsage",
792
            "EncryptionAlgorithms",
793
            "SigningAlgorithms",
794
        ]
795
        result = select_attributes(key.metadata, attrs)
1✔
796
        result["PublicKey"] = key.crypto_key.public_key
1✔
797
        result["KeyId"] = key.metadata["Arn"]
1✔
798
        return GetPublicKeyResponse(**result)
1✔
799

800
    def _generate_data_key_pair(
1✔
801
        self,
802
        context: RequestContext,
803
        key_id: str,
804
        key_pair_spec: str,
805
        encryption_context: EncryptionContextType = None,
806
        dry_run: NullableBooleanType = None,
807
    ):
808
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
809
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
810
        self._validate_key_for_encryption_decryption(context, key)
1✔
811
        KmsCryptoKey.assert_valid(key_pair_spec)
1✔
812
        return execute_dry_run_capable(
1✔
813
            self._build_data_key_pair_response, dry_run, key, key_pair_spec, encryption_context
814
        )
815

816
    def _build_data_key_pair_response(
1✔
817
        self, key: KmsKey, key_pair_spec: str, encryption_context: EncryptionContextType = None
818
    ):
819
        crypto_key = KmsCryptoKey(key_pair_spec)
1✔
820

821
        return {
1✔
822
            "KeyId": key.metadata["Arn"],
823
            "KeyPairSpec": key_pair_spec,
824
            "PrivateKeyCiphertextBlob": key.encrypt(crypto_key.private_key, encryption_context),
825
            "PrivateKeyPlaintext": crypto_key.private_key,
826
            "PublicKey": crypto_key.public_key,
827
        }
828

829
    @handler("GenerateDataKeyPair")
1✔
830
    def generate_data_key_pair(
1✔
831
        self,
832
        context: RequestContext,
833
        key_id: KeyIdType,
834
        key_pair_spec: DataKeyPairSpec,
835
        encryption_context: EncryptionContextType = None,
836
        grant_tokens: GrantTokenList = None,
837
        recipient: RecipientInfo = None,
838
        dry_run: NullableBooleanType = None,
839
        **kwargs,
840
    ) -> GenerateDataKeyPairResponse:
841
        result = self._generate_data_key_pair(
1✔
842
            context, key_id, key_pair_spec, encryption_context, dry_run
843
        )
844
        return GenerateDataKeyPairResponse(**result)
1✔
845

846
    @handler("GenerateRandom", expand=False)
1✔
847
    def generate_random(
1✔
848
        self, context: RequestContext, request: GenerateRandomRequest
849
    ) -> GenerateRandomResponse:
850
        number_of_bytes = request.get("NumberOfBytes")
1✔
851
        if number_of_bytes is None:
1✔
852
            raise ValidationException("NumberOfBytes is required.")
1✔
853
        if number_of_bytes > 1024:
1✔
854
            raise ValidationException(
1✔
855
                f"1 validation error detected: Value '{number_of_bytes}' at 'numberOfBytes' failed "
856
                "to satisfy constraint: Member must have value less than or equal to 1024"
857
            )
858
        if number_of_bytes < 1:
1✔
859
            raise ValidationException(
1✔
860
                f"1 validation error detected: Value '{number_of_bytes}' at 'numberOfBytes' failed "
861
                "to satisfy constraint: Member must have value greater than or equal to 1"
862
            )
863

864
        byte_string = os.urandom(number_of_bytes)
1✔
865

866
        return GenerateRandomResponse(Plaintext=byte_string)
1✔
867

868
    @handler("GenerateDataKeyPairWithoutPlaintext")
1✔
869
    def generate_data_key_pair_without_plaintext(
1✔
870
        self,
871
        context: RequestContext,
872
        key_id: KeyIdType,
873
        key_pair_spec: DataKeyPairSpec,
874
        encryption_context: EncryptionContextType = None,
875
        grant_tokens: GrantTokenList = None,
876
        dry_run: NullableBooleanType = None,
877
        **kwargs,
878
    ) -> GenerateDataKeyPairWithoutPlaintextResponse:
879
        result = self._generate_data_key_pair(
1✔
880
            context, key_id, key_pair_spec, encryption_context, dry_run
881
        )
882
        result.pop("PrivateKeyPlaintext")
1✔
883
        return GenerateDataKeyPairResponse(**result)
1✔
884

885
    # We currently act on neither on KeySpec setting (which is different from and holds values different then
886
    # KeySpec for CreateKey) nor on NumberOfBytes. Instead, we generate a key with a key length that is "standard" in
887
    # LocalStack.
888
    #
889
    def _generate_data_key(
1✔
890
        self, context: RequestContext, key_id: str, encryption_context: EncryptionContextType = None
891
    ):
892
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
893
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
894
        # TODO Should also have a validation for the key being a symmetric one.
895
        self._validate_key_for_encryption_decryption(context, key)
1✔
896
        crypto_key = KmsCryptoKey("SYMMETRIC_DEFAULT")
1✔
897
        return {
1✔
898
            "KeyId": key.metadata["Arn"],
899
            "Plaintext": crypto_key.key_material,
900
            "CiphertextBlob": key.encrypt(crypto_key.key_material, encryption_context),
901
        }
902

903
    @handler("GenerateDataKey", expand=False)
1✔
904
    def generate_data_key(
1✔
905
        self, context: RequestContext, request: GenerateDataKeyRequest
906
    ) -> GenerateDataKeyResponse:
907
        result = self._generate_data_key(
1✔
908
            context, request.get("KeyId"), request.get("EncryptionContext")
909
        )
910
        return GenerateDataKeyResponse(**result)
1✔
911

912
    @handler("GenerateDataKeyWithoutPlaintext", expand=False)
1✔
913
    def generate_data_key_without_plaintext(
1✔
914
        self, context: RequestContext, request: GenerateDataKeyWithoutPlaintextRequest
915
    ) -> GenerateDataKeyWithoutPlaintextResponse:
916
        result = self._generate_data_key(
1✔
917
            context, request.get("KeyId"), request.get("EncryptionContext")
918
        )
919
        result.pop("Plaintext")
1✔
920
        return GenerateDataKeyWithoutPlaintextResponse(**result)
1✔
921

922
    @handler("GenerateMac", expand=False)
1✔
923
    def generate_mac(
1✔
924
        self,
925
        context: RequestContext,
926
        request: GenerateMacRequest,
927
    ) -> GenerateMacResponse:
928
        msg = request.get("Message")
1✔
929
        self._validate_mac_msg_length(msg)
1✔
930

931
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
932
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
933

934
        self._validate_key_for_generate_verify_mac(context, key)
1✔
935

936
        algorithm = request.get("MacAlgorithm")
1✔
937
        self._validate_mac_algorithm(key, algorithm)
1✔
938

939
        mac = key.generate_mac(msg, algorithm)
1✔
940

941
        return GenerateMacResponse(Mac=mac, MacAlgorithm=algorithm, KeyId=key.metadata.get("Arn"))
1✔
942

943
    @handler("VerifyMac", expand=False)
1✔
944
    def verify_mac(
1✔
945
        self,
946
        context: RequestContext,
947
        request: VerifyMacRequest,
948
    ) -> VerifyMacResponse:
949
        msg = request.get("Message")
1✔
950
        self._validate_mac_msg_length(msg)
1✔
951

952
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
953
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
954

955
        self._validate_key_for_generate_verify_mac(context, key)
1✔
956

957
        algorithm = request.get("MacAlgorithm")
1✔
958
        self._validate_mac_algorithm(key, algorithm)
1✔
959

960
        mac_valid = key.verify_mac(msg, request.get("Mac"), algorithm)
1✔
961

962
        return VerifyMacResponse(
1✔
963
            KeyId=key.metadata.get("Arn"), MacValid=mac_valid, MacAlgorithm=algorithm
964
        )
965

966
    @handler("Sign", expand=False)
1✔
967
    def sign(self, context: RequestContext, request: SignRequest) -> SignResponse:
1✔
968
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
969
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
970

971
        self._validate_key_for_sign_verify(context, key)
1✔
972

973
        # TODO Add constraints on KeySpec / SigningAlgorithm pairs:
974
        #  https://docs.aws.amazon.com/kms/latest/developerguide/asymmetric-key-specs.html#key-spec-ecc
975

976
        signing_algorithm = request.get("SigningAlgorithm")
1✔
977
        signature = key.sign(request.get("Message"), request.get("MessageType"), signing_algorithm)
1✔
978

979
        result = {
1✔
980
            "KeyId": key.metadata["Arn"],
981
            "Signature": signature,
982
            "SigningAlgorithm": signing_algorithm,
983
        }
984
        return SignResponse(**result)
1✔
985

986
    # Currently LocalStack only calculates SHA256 digests no matter what the signing algorithm is.
987
    @handler("Verify", expand=False)
1✔
988
    def verify(self, context: RequestContext, request: VerifyRequest) -> VerifyResponse:
1✔
989
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
990
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
991

992
        self._validate_key_for_sign_verify(context, key)
1✔
993

994
        signing_algorithm = request.get("SigningAlgorithm")
1✔
995
        is_signature_valid = key.verify(
1✔
996
            request.get("Message"),
997
            request.get("MessageType"),
998
            signing_algorithm,
999
            request.get("Signature"),
1000
        )
1001

1002
        result = {
1✔
1003
            "KeyId": key.metadata["Arn"],
1004
            "SignatureValid": is_signature_valid,
1005
            "SigningAlgorithm": signing_algorithm,
1006
        }
1007
        return VerifyResponse(**result)
1✔
1008

1009
    def re_encrypt(
1✔
1010
        self,
1011
        context: RequestContext,
1012
        destination_key_id: KeyIdType,
1013
        ciphertext_blob: CiphertextType | None = None,
1014
        source_encryption_context: EncryptionContextType | None = None,
1015
        source_key_id: KeyIdType | None = None,
1016
        destination_encryption_context: EncryptionContextType | None = None,
1017
        source_encryption_algorithm: EncryptionAlgorithmSpec | None = None,
1018
        destination_encryption_algorithm: EncryptionAlgorithmSpec | None = None,
1019
        grant_tokens: GrantTokenList | None = None,
1020
        dry_run: NullableBooleanType | None = None,
1021
        dry_run_modifiers: DryRunModifierList | None = None,
1022
        **kwargs,
1023
    ) -> ReEncryptResponse:
1024
        # TODO: when implementing, ensure cross-account support for source_key_id and destination_key_id
1025
        # Parse and fetch source Key
1026
        account_id, region_name, source_key_id = self._parse_key_id(source_key_id, context)
1✔
1027
        source_key = self._get_kms_key(account_id, region_name, source_key_id)
1✔
1028
        # Decrypt using source key
1029
        decrypt_response = self.decrypt(
1✔
1030
            context=context,
1031
            ciphertext_blob=ciphertext_blob,
1032
            encryption_context=source_encryption_context,
1033
            encryption_algorithm=source_encryption_algorithm,
1034
            key_id=source_key_id,
1035
            grant_tokens=grant_tokens,
1036
        )
1037
        # Parse and fetch destination key
1038
        account_id, region_name, destination_key_id = self._parse_key_id(
1✔
1039
            destination_key_id, context
1040
        )
1041
        destination_key = self._get_kms_key(account_id, region_name, destination_key_id)
1✔
1042
        # Encrypt using destination key
1043
        encrypt_response = self.encrypt(
1✔
1044
            context=context,
1045
            encryption_context=destination_encryption_context,
1046
            key_id=destination_key_id,
1047
            plaintext=decrypt_response["Plaintext"],
1048
            grant_tokens=grant_tokens,
1049
            dry_run=dry_run,
1050
        )
1051
        return ReEncryptResponse(
1✔
1052
            CiphertextBlob=encrypt_response["CiphertextBlob"],
1053
            SourceKeyId=source_key.metadata.get("Arn"),
1054
            KeyId=destination_key.metadata.get("Arn"),
1055
            SourceEncryptionAlgorithm=source_encryption_algorithm,
1056
            DestinationEncryptionAlgorithm=destination_encryption_algorithm,
1057
        )
1058

1059
    def encrypt(
1✔
1060
        self,
1061
        context: RequestContext,
1062
        key_id: KeyIdType,
1063
        plaintext: PlaintextType,
1064
        encryption_context: EncryptionContextType = None,
1065
        grant_tokens: GrantTokenList = None,
1066
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1067
        dry_run: NullableBooleanType = None,
1068
        **kwargs,
1069
    ) -> EncryptResponse:
1070
        # TODO add support for "dry_run"
1071
        account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
1072
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1073
        self._validate_plaintext_length(plaintext)
1✔
1074
        self._validate_plaintext_key_type_based(plaintext, key, encryption_algorithm)
1✔
1075
        self._validate_key_for_encryption_decryption(context, key)
1✔
1076
        self._validate_key_state_not_pending_import(key)
1✔
1077

1078
        ciphertext_blob = key.encrypt(plaintext, encryption_context)
1✔
1079
        # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always
1080
        # encrypts with symmetric encryption no matter the key settings.
1081
        return EncryptResponse(
1✔
1082
            CiphertextBlob=ciphertext_blob,
1083
            KeyId=key.metadata.get("Arn"),
1084
            EncryptionAlgorithm=encryption_algorithm,
1085
        )
1086

1087
    # TODO We currently do not even check encryption_context, while moto does. Should add the corresponding logic later.
1088
    def decrypt(
1✔
1089
        self,
1090
        context: RequestContext,
1091
        ciphertext_blob: CiphertextType | None = None,
1092
        encryption_context: EncryptionContextType | None = None,
1093
        grant_tokens: GrantTokenList | None = None,
1094
        key_id: KeyIdType | None = None,
1095
        encryption_algorithm: EncryptionAlgorithmSpec | None = None,
1096
        recipient: RecipientInfo | None = None,
1097
        dry_run: NullableBooleanType | None = None,
1098
        dry_run_modifiers: DryRunModifierList | None = None,
1099
        **kwargs,
1100
    ) -> DecryptResponse:
1101
        # In AWS, key_id is only supplied for data encrypted with an asymmetrical algorithm. For symmetrical
1102
        # encryption, key_id is taken from the encrypted data itself.
1103
        # Since LocalStack doesn't currently do asymmetrical encryption, there is a question of modeling here: we
1104
        # currently expect data to be only encrypted with symmetric encryption, so having key_id inside. It might not
1105
        # always be what customers expect.
1106
        if key_id:
1✔
1107
            account_id, region_name, key_id = self._parse_key_id(key_id, context)
1✔
1108
            try:
1✔
1109
                ciphertext = deserialize_ciphertext_blob(ciphertext_blob=ciphertext_blob)
1✔
1110
            except Exception as e:
1✔
1111
                logging.error("Error deserializing ciphertext blob: %s", e)
1✔
1112
                ciphertext = None
1✔
1113
                pass
1✔
1114
        else:
1115
            try:
×
1116
                ciphertext = deserialize_ciphertext_blob(ciphertext_blob=ciphertext_blob)
×
1117
                account_id, region_name, key_id = self._parse_key_id(ciphertext.key_id, context)
×
1118
            except Exception:
×
1119
                raise InvalidCiphertextException(
×
1120
                    "LocalStack is unable to deserialize the ciphertext blob. Perhaps the "
1121
                    "blob didn't come from LocalStack"
1122
                )
1123

1124
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1125
        if ciphertext and key.metadata["KeyId"] != ciphertext.key_id:
1✔
1126
            raise IncorrectKeyException(
1✔
1127
                "The key ID in the request does not identify a CMK that can perform this operation."
1128
            )
1129

1130
        self._validate_key_for_encryption_decryption(context, key)
1✔
1131
        self._validate_key_state_not_pending_import(key)
1✔
1132

1133
        # Handle the recipient field.  This is used by AWS Nitro to re-encrypt the plaintext to the key specified
1134
        # by the enclave.  Proper support for this will take significant work to figure out how to model enforcing
1135
        # the attestation measurements; for now, if recipient is specified and has an attestation doc in it including
1136
        # a public key where it's expected to be, we encrypt to that public key.  This at least allows users to use
1137
        # localstack as a drop-in replacement for AWS when testing without having to skip the secondary decryption
1138
        # when using localstack.
1139
        recipient_pubkey = None
1✔
1140
        if recipient:
1✔
1141
            attestation_document = recipient["AttestationDocument"]
1✔
1142
            # We do all of this in a try/catch and warn if it fails so that if users are currently passing a nonsense
1143
            # value we don't break it for them.  In the future we could do a breaking change to require a valid attestation
1144
            # (or at least one that contains the public key).
1145
            try:
1✔
1146
                recipient_pubkey = self._extract_attestation_pubkey(attestation_document)
1✔
1147
            except Exception as e:
1✔
1148
                logging.warning(
1✔
1149
                    "Unable to extract public key from non-empty attestation document: %s", e
1150
                )
1151

1152
        try:
1✔
1153
            # TODO: Extend the implementation to handle additional encryption/decryption scenarios
1154
            # beyond the current support for offline encryption and online decryption using RSA keys if key id exists in
1155
            # parameters, where `ciphertext_blob` will not be deserializable.
1156
            if self._is_rsa_spec(key.crypto_key.key_spec) and not ciphertext:
1✔
1157
                plaintext = key.decrypt_rsa(ciphertext_blob)
1✔
1158
            else:
1159
                # if symmetric encryption then ciphertext must not be None
1160
                if ciphertext is None:
1✔
1161
                    raise InvalidCiphertextException()
1✔
1162
                plaintext = key.decrypt(ciphertext, encryption_context)
1✔
1163
        except InvalidTag:
1✔
1164
            raise InvalidCiphertextException()
×
1165

1166
        # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always
1167
        # encrypts with symmetric encryption no matter the key settings.
1168
        #
1169
        # We return a key ARN instead of KeyId despite the name of the parameter, as this is what AWS does and states
1170
        # in its docs.
1171
        #  https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html#API_Decrypt_RequestSyntax
1172
        # TODO add support for "dry_run"
1173
        response = DecryptResponse(
1✔
1174
            KeyId=key.metadata.get("Arn"),
1175
            EncryptionAlgorithm=encryption_algorithm,
1176
        )
1177

1178
        # Encrypt to the recipient pubkey if specified.  Otherwise, return the actual plaintext
1179
        if recipient_pubkey:
1✔
1180
            response["CiphertextForRecipient"] = pkcs7_envelope_encrypt(plaintext, recipient_pubkey)
1✔
1181
        else:
1182
            response["Plaintext"] = plaintext
1✔
1183

1184
        return response
1✔
1185

1186
    def get_parameters_for_import(
1✔
1187
        self,
1188
        context: RequestContext,
1189
        key_id: KeyIdType,
1190
        wrapping_algorithm: AlgorithmSpec,
1191
        wrapping_key_spec: WrappingKeySpec,
1192
        **kwargs,
1193
    ) -> GetParametersForImportResponse:
1194
        store = self._get_store(context.account_id, context.region)
1✔
1195
        # KeyId can potentially hold one of multiple different types of key identifiers. get_key finds a key no
1196
        # matter which type of id is used.
1197
        key_to_import_material_to = self._get_kms_key(
1✔
1198
            context.account_id,
1199
            context.region,
1200
            key_id,
1201
            enabled_key_allowed=True,
1202
            disabled_key_allowed=True,
1203
        )
1204
        key_arn = key_to_import_material_to.metadata["Arn"]
1✔
1205
        key_origin = key_to_import_material_to.metadata.get("Origin")
1✔
1206

1207
        if key_origin != "EXTERNAL":
1✔
1208
            raise UnsupportedOperationException(
1✔
1209
                f"{key_arn} origin is {key_origin} which is not valid for this operation."
1210
            )
1211

1212
        key_id = key_to_import_material_to.metadata["KeyId"]
1✔
1213

1214
        key = KmsKey(CreateKeyRequest(KeySpec=wrapping_key_spec))
1✔
1215
        import_token = short_uid()
1✔
1216
        import_state = KeyImportState(
1✔
1217
            key_id=key_id, import_token=import_token, wrapping_algo=wrapping_algorithm, key=key
1218
        )
1219
        store.imports[import_token] = import_state
1✔
1220
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_GetParametersForImport.html
1221
        # "To import key material, you must use the public key and import token from the same response. These items
1222
        # are valid for 24 hours."
1223
        expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)
1✔
1224
        return GetParametersForImportResponse(
1✔
1225
            KeyId=key_to_import_material_to.metadata["Arn"],
1226
            ImportToken=to_bytes(import_state.import_token),
1227
            PublicKey=import_state.key.crypto_key.public_key,
1228
            ParametersValidTo=expiry_date,
1229
        )
1230

1231
    def import_key_material(
1✔
1232
        self,
1233
        context: RequestContext,
1234
        key_id: KeyIdType,
1235
        import_token: CiphertextType,
1236
        encrypted_key_material: CiphertextType,
1237
        valid_to: DateType | None = None,
1238
        expiration_model: ExpirationModelType | None = None,
1239
        import_type: ImportType | None = None,
1240
        key_material_description: KeyMaterialDescriptionType | None = None,
1241
        key_material_id: BackingKeyIdType | None = None,
1242
        **kwargs,
1243
    ) -> ImportKeyMaterialResponse:
1244
        store = self._get_store(context.account_id, context.region)
1✔
1245
        import_token = to_str(import_token)
1✔
1246
        import_state = store.imports.get(import_token)
1✔
1247
        if not import_state:
1✔
1248
            raise NotFoundException(f"Unable to find key import token '{import_token}'")
×
1249
        key_to_import_material_to = self._get_kms_key(
1✔
1250
            context.account_id,
1251
            context.region,
1252
            key_id,
1253
            enabled_key_allowed=True,
1254
            disabled_key_allowed=True,
1255
        )
1256

1257
        # TODO check if there was already a key imported for this kms key
1258
        # if so, it has to be identical. We cannot change keys by reimporting after deletion/expiry
1259
        key_material = self._decrypt_wrapped_key_material(import_state, encrypted_key_material)
1✔
1260
        key_material_id = key_to_import_material_to.generate_key_material_id(key_material)
1✔
1261
        key_to_import_material_to.metadata["ExpirationModel"] = (
1✔
1262
            expiration_model or ExpirationModelType.KEY_MATERIAL_EXPIRES
1263
        )
1264
        if (
1✔
1265
            key_to_import_material_to.metadata["ExpirationModel"]
1266
            == ExpirationModelType.KEY_MATERIAL_EXPIRES
1267
            and not valid_to
1268
        ):
1269
            raise ValidationException(
1✔
1270
                "A validTo date must be set if the ExpirationModel is KEY_MATERIAL_EXPIRES"
1271
            )
1272
        if existing_pending_material := key_to_import_material_to.crypto_key.pending_key_material:
1✔
1273
            pending_key_material_id = key_to_import_material_to.generate_key_material_id(
1✔
1274
                existing_pending_material
1275
            )
1276
            raise KMSInvalidStateException(
1✔
1277
                f"New key material (id: {key_material_id}) cannot be imported into KMS key "
1278
                f"{key_to_import_material_to.metadata['Arn']}, because another key material "
1279
                f"(id: {pending_key_material_id}) is pending rotation."
1280
            )
1281

1282
        # TODO actually set validTo and make the key expire
1283
        key_to_import_material_to.metadata["Enabled"] = True
1✔
1284
        key_to_import_material_to.metadata["KeyState"] = KeyState.Enabled
1✔
1285
        key_to_import_material_to.crypto_key.load_key_material(key_material)
1✔
1286

1287
        # KeyMaterialId / CurrentKeyMaterialId is only exposed for symmetric encryption keys.
1288
        key_material_id_response = None
1✔
1289
        if key_to_import_material_to.metadata["KeySpec"] == KeySpec.SYMMETRIC_DEFAULT:
1✔
1290
            key_material_id_response = key_to_import_material_to.generate_key_material_id(
1✔
1291
                key_material
1292
            )
1293

1294
            # If there is no CurrentKeyMaterialId, instantly promote the pending key material to the current.
1295
            if key_to_import_material_to.metadata.get("CurrentKeyMaterialId") is None:
1✔
1296
                key_to_import_material_to.metadata["CurrentKeyMaterialId"] = (
1✔
1297
                    key_material_id_response
1298
                )
1299
                key_to_import_material_to.crypto_key.key_material = (
1✔
1300
                    key_to_import_material_to.crypto_key.pending_key_material
1301
                )
1302
                key_to_import_material_to.crypto_key.pending_key_material = None
1✔
1303

1304
        return ImportKeyMaterialResponse(
1✔
1305
            KeyId=key_to_import_material_to.metadata["Arn"],
1306
            KeyMaterialId=key_material_id_response,
1307
        )
1308

1309
    def delete_imported_key_material(
1✔
1310
        self,
1311
        context: RequestContext,
1312
        key_id: KeyIdType,
1313
        key_material_id: BackingKeyIdType | None = None,
1314
        **kwargs,
1315
    ) -> DeleteImportedKeyMaterialResponse:
1316
        # TODO add support for key_material_id
1317
        key = self._get_kms_key(
1✔
1318
            context.account_id,
1319
            context.region,
1320
            key_id,
1321
            enabled_key_allowed=True,
1322
            disabled_key_allowed=True,
1323
        )
1324
        key.crypto_key.key_material = None
1✔
1325
        key.metadata["Enabled"] = False
1✔
1326
        key.metadata["KeyState"] = KeyState.PendingImport
1✔
1327
        key.metadata.pop("ExpirationModel", None)
1✔
1328

1329
        # TODO populate DeleteImportedKeyMaterialResponse
1330
        return DeleteImportedKeyMaterialResponse()
1✔
1331

1332
    @handler("CreateAlias", expand=False)
1✔
1333
    def create_alias(self, context: RequestContext, request: CreateAliasRequest) -> None:
1✔
1334
        store = self._get_store(context.account_id, context.region)
1✔
1335
        alias_name = request["AliasName"]
1✔
1336
        validate_alias_name(alias_name)
1✔
1337
        if alias_name in store.aliases:
1✔
1338
            alias_arn = store.aliases.get(alias_name).metadata["AliasArn"]
×
1339
            # AWS itself uses AliasArn instead of AliasName in this exception.
1340
            raise AlreadyExistsException(f"An alias with the name {alias_arn} already exists")
×
1341
        # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
1342
        # matter which type of id is used.
1343
        key = self._get_kms_key(
1✔
1344
            context.account_id,
1345
            context.region,
1346
            request.get("TargetKeyId"),
1347
            enabled_key_allowed=True,
1348
            disabled_key_allowed=True,
1349
        )
1350
        request["TargetKeyId"] = key.metadata.get("KeyId")
1✔
1351
        self._create_kms_alias(context.account_id, context.region, request)
1✔
1352

1353
    @handler("DeleteAlias", expand=False)
1✔
1354
    def delete_alias(self, context: RequestContext, request: DeleteAliasRequest) -> None:
1✔
1355
        # We do not check the state of the key, as, according to AWS docs, all key states, that are possible in
1356
        # LocalStack, are supported by this operation.
1357
        store = self._get_store(context.account_id, context.region)
1✔
1358
        alias_name = request["AliasName"]
1✔
1359
        if alias_name not in store.aliases:
1✔
1360
            alias_arn = kms_alias_arn(request["AliasName"], context.account_id, context.region)
1✔
1361
            # AWS itself uses AliasArn instead of AliasName in this exception.
1362
            raise NotFoundException(f"Alias {alias_arn} is not found")
1✔
1363
        store.aliases.pop(alias_name, None)
1✔
1364

1365
    @handler("UpdateAlias", expand=False)
1✔
1366
    def update_alias(self, context: RequestContext, request: UpdateAliasRequest) -> None:
1✔
1367
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1368
        # "If the source KMS key is pending deletion, the command succeeds. If the destination KMS key is pending
1369
        # deletion, the command fails with error: KMSInvalidStateException : <key ARN> is pending deletion."
1370
        # Also disabled keys are accepted for this operation (see the table on that page).
1371
        #
1372
        # As such, we do not care about the state of the source key, but check the destination one.
1373

1374
        alias_name = request["AliasName"]
1✔
1375
        # This API, per AWS docs, accepts only names, not ARNs.
1376
        validate_alias_name(alias_name)
1✔
1377
        alias = self._get_kms_alias(context.account_id, context.region, alias_name)
1✔
1378
        key_id = request["TargetKeyId"]
1✔
1379
        # Don't care about the key itself, just want to validate its state.
1380
        self._get_kms_key(
1✔
1381
            context.account_id,
1382
            context.region,
1383
            key_id,
1384
            enabled_key_allowed=True,
1385
            disabled_key_allowed=True,
1386
        )
1387
        alias.metadata["TargetKeyId"] = key_id
1✔
1388
        alias.update_date_of_last_update()
1✔
1389

1390
    @handler("ListAliases")
1✔
1391
    def list_aliases(
1✔
1392
        self,
1393
        context: RequestContext,
1394
        key_id: KeyIdType = None,
1395
        limit: LimitType = None,
1396
        marker: MarkerType = None,
1397
        **kwargs,
1398
    ) -> ListAliasesResponse:
1399
        store = self._get_store(context.account_id, context.region)
1✔
1400
        if key_id:
1✔
1401
            # KeyId can potentially hold one of multiple different types of key identifiers. Here we find a key no
1402
            # matter which type of id is used.
1403
            key = self._get_kms_key(
1✔
1404
                context.account_id, context.region, key_id, any_key_state_allowed=True
1405
            )
1406
            key_id = key.metadata.get("KeyId")
1✔
1407

1408
        matching_aliases = []
1✔
1409
        for alias in store.aliases.values():
1✔
1410
            if key_id and alias.metadata["TargetKeyId"] != key_id:
1✔
1411
                continue
1✔
1412
            matching_aliases.append(alias.metadata)
1✔
1413
        aliases_list = PaginatedList(matching_aliases)
1✔
1414
        limit = limit or 100
1✔
1415
        page, next_token = aliases_list.get_page(
1✔
1416
            lambda alias_metadata: alias_metadata.get("AliasName"),
1417
            next_token=marker,
1418
            page_size=limit,
1419
        )
1420
        kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
1✔
1421
        return ListAliasesResponse(Aliases=page, **kwargs)
1✔
1422

1423
    @handler("GetKeyRotationStatus", expand=False)
1✔
1424
    def get_key_rotation_status(
1✔
1425
        self, context: RequestContext, request: GetKeyRotationStatusRequest
1426
    ) -> GetKeyRotationStatusResponse:
1427
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1428
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1429
        # We do not model that here, though.
1430
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
1431
        key = self._get_kms_key(account_id, region_name, key_id, any_key_state_allowed=True)
1✔
1432

1433
        response = GetKeyRotationStatusResponse(
1✔
1434
            KeyId=key.metadata["Arn"],
1435
            KeyRotationEnabled=key.is_key_rotation_enabled,
1436
            NextRotationDate=key.next_rotation_date,
1437
        )
1438
        if key.is_key_rotation_enabled:
1✔
1439
            response["RotationPeriodInDays"] = key.rotation_period_in_days
1✔
1440

1441
        return response
1✔
1442

1443
    @handler("DisableKeyRotation", expand=False)
1✔
1444
    def disable_key_rotation(
1✔
1445
        self, context: RequestContext, request: DisableKeyRotationRequest
1446
    ) -> None:
1447
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1448
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1449
        # We do not model that here, though.
1450
        key = self._get_kms_key(context.account_id, context.region, request.get("KeyId"))
1✔
1451
        key.is_key_rotation_enabled = False
1✔
1452

1453
    @handler("EnableKeyRotation", expand=False)
1✔
1454
    def enable_key_rotation(
1✔
1455
        self, context: RequestContext, request: EnableKeyRotationRequest
1456
    ) -> None:
1457
        # https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
1458
        # "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
1459
        # We do not model that here, though.
1460
        key = self._get_kms_key(context.account_id, context.region, request.get("KeyId"))
1✔
1461
        key.is_key_rotation_enabled = True
1✔
1462
        if request.get("RotationPeriodInDays"):
1✔
1463
            key.rotation_period_in_days = request.get("RotationPeriodInDays")
1✔
1464
        key._update_key_rotation_date()
1✔
1465

1466
    @handler("ListKeyPolicies", expand=False)
1✔
1467
    def list_key_policies(
1✔
1468
        self, context: RequestContext, request: ListKeyPoliciesRequest
1469
    ) -> ListKeyPoliciesResponse:
1470
        # We just care if the key exists. The response, by AWS specifications, is the same for all keys, as the only
1471
        # supported policy is "default":
1472
        # https://docs.aws.amazon.com/kms/latest/APIReference/API_ListKeyPolicies.html#API_ListKeyPolicies_ResponseElements
1473
        self._get_kms_key(
1✔
1474
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1475
        )
1476
        return ListKeyPoliciesResponse(PolicyNames=["default"], Truncated=False)
1✔
1477

1478
    @handler("PutKeyPolicy", expand=False)
1✔
1479
    def put_key_policy(self, context: RequestContext, request: PutKeyPolicyRequest) -> None:
1✔
1480
        key = self._get_kms_key(
1✔
1481
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1482
        )
1483
        if request.get("PolicyName") != "default":
1✔
1484
            raise UnsupportedOperationException("Only default policy is supported")
×
1485
        key.policy = request.get("Policy")
1✔
1486

1487
    @handler("GetKeyPolicy", expand=False)
1✔
1488
    def get_key_policy(
1✔
1489
        self, context: RequestContext, request: GetKeyPolicyRequest
1490
    ) -> GetKeyPolicyResponse:
1491
        key = self._get_kms_key(
1✔
1492
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1493
        )
1494
        if request.get("PolicyName") != "default":
1✔
1495
            raise NotFoundException("No such policy exists")
×
1496
        return GetKeyPolicyResponse(Policy=key.policy)
1✔
1497

1498
    @handler("ListResourceTags", expand=False)
1✔
1499
    def list_resource_tags(
1✔
1500
        self, context: RequestContext, request: ListResourceTagsRequest
1501
    ) -> ListResourceTagsResponse:
1502
        key = self._get_kms_key(
1✔
1503
            context.account_id, context.region, request.get("KeyId"), any_key_state_allowed=True
1504
        )
1505
        keys_list = PaginatedList(
1✔
1506
            self._get_key_tags(context.account_id, context.region, key.metadata["Arn"])
1507
        )
1508
        page, next_token = keys_list.get_page(
1✔
1509
            lambda tag: tag.get("TagKey"),
1510
            next_token=request.get("Marker"),
1511
            page_size=request.get("Limit", 50),
1512
        )
1513
        kwargs = (
1✔
1514
            {"NextMarker": next_token, "Truncated": True} if next_token else {"Truncated": False}
1515
        )
1516
        return ListResourceTagsResponse(Tags=page, **kwargs)
1✔
1517

1518
    @handler("RotateKeyOnDemand", expand=False)
1✔
1519
    # TODO: return the key rotations in the ListKeyRotations operation
1520
    def rotate_key_on_demand(
1✔
1521
        self, context: RequestContext, request: RotateKeyOnDemandRequest
1522
    ) -> RotateKeyOnDemandResponse:
1523
        account_id, region_name, key_id = self._parse_key_id(request["KeyId"], context)
1✔
1524
        key = self._get_kms_key(account_id, region_name, key_id)
1✔
1525

1526
        if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT:
1✔
1527
            raise UnsupportedOperationException()
1✔
1528
        self._validate_key_state_not_pending_import(key)
1✔
1529
        self._validate_external_key_has_pending_material(key)
1✔
1530

1531
        key.rotate_key_on_demand()
1✔
1532

1533
        return RotateKeyOnDemandResponse(
1✔
1534
            KeyId=key.metadata["Arn"],
1535
        )
1536

1537
    @handler("TagResource", expand=False)
1✔
1538
    def tag_resource(self, context: RequestContext, request: TagResourceRequest) -> None:
1✔
1539
        key = self._get_kms_key(
1✔
1540
            context.account_id,
1541
            context.region,
1542
            request.get("KeyId"),
1543
            enabled_key_allowed=True,
1544
            disabled_key_allowed=True,
1545
        )
1546
        if tags := request["Tags"]:
1✔
1547
            self._set_key_tags(context.account_id, context.region, key.metadata["Arn"], tags)
1✔
1548

1549
    @handler("UntagResource", expand=False)
1✔
1550
    def untag_resource(self, context: RequestContext, request: UntagResourceRequest) -> None:
1✔
1551
        key = self._get_kms_key(
1✔
1552
            context.account_id,
1553
            context.region,
1554
            request.get("KeyId"),
1555
            enabled_key_allowed=True,
1556
            disabled_key_allowed=True,
1557
        )
1558
        self._remove_key_tags(
1✔
1559
            context.account_id, context.region, key.metadata["Arn"], request["TagKeys"]
1560
        )
1561

1562
    def derive_shared_secret(
1✔
1563
        self,
1564
        context: RequestContext,
1565
        key_id: KeyIdType,
1566
        key_agreement_algorithm: KeyAgreementAlgorithmSpec,
1567
        public_key: PublicKeyType,
1568
        grant_tokens: GrantTokenList = None,
1569
        dry_run: NullableBooleanType = None,
1570
        recipient: RecipientInfo = None,
1571
        **kwargs,
1572
    ) -> DeriveSharedSecretResponse:
1573
        key = self._get_kms_key(
1✔
1574
            context.account_id,
1575
            context.region,
1576
            key_id,
1577
            enabled_key_allowed=True,
1578
            disabled_key_allowed=True,
1579
        )
1580
        key_usage = key.metadata.get("KeyUsage")
1✔
1581
        key_origin = key.metadata.get("Origin")
1✔
1582

1583
        if key_usage != KeyUsageType.KEY_AGREEMENT:
1✔
1584
            raise InvalidKeyUsageException(
1✔
1585
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1586
            )
1587

1588
        if key_agreement_algorithm != KeyAgreementAlgorithmSpec.ECDH:
1✔
1589
            raise ValidationException(
1✔
1590
                f"1 validation error detected: Value '{key_agreement_algorithm}' at 'keyAgreementAlgorithm' "
1591
                f"failed to satisfy constraint: Member must satisfy enum value set: [ECDH]"
1592
            )
1593

1594
        # TODO: Verify the actual error raised
1595
        if key_origin not in [OriginType.AWS_KMS, OriginType.EXTERNAL]:
1✔
1596
            raise ValueError(f"Key origin: {key_origin} is not valid for {context.operation.name}.")
×
1597

1598
        shared_secret = key.derive_shared_secret(public_key)
1✔
1599
        return DeriveSharedSecretResponse(
1✔
1600
            KeyId=key_id,
1601
            SharedSecret=shared_secret,
1602
            KeyAgreementAlgorithm=key_agreement_algorithm,
1603
            KeyOrigin=key_origin,
1604
        )
1605

1606
    def _validate_key_state_not_pending_import(self, key: KmsKey):
1✔
1607
        if key.metadata["KeyState"] == KeyState.PendingImport:
1✔
1608
            raise KMSInvalidStateException(f"{key.metadata['Arn']} is pending import.")
1✔
1609

1610
    def _validate_external_key_has_pending_material(self, key: KmsKey):
1✔
1611
        if key.metadata["Origin"] == "EXTERNAL" and key.crypto_key.pending_key_material is None:
1✔
1612
            raise KMSInvalidStateException(
1✔
1613
                f"No available key material pending rotation for the key: {key.metadata['Arn']}."
1614
            )
1615

1616
    def _validate_key_for_encryption_decryption(self, context: RequestContext, key: KmsKey):
1✔
1617
        key_usage = key.metadata["KeyUsage"]
1✔
1618
        if key_usage != "ENCRYPT_DECRYPT":
1✔
1619
            raise InvalidKeyUsageException(
1✔
1620
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1621
            )
1622

1623
    def _validate_key_for_sign_verify(self, context: RequestContext, key: KmsKey):
1✔
1624
        key_usage = key.metadata["KeyUsage"]
1✔
1625
        if key_usage != "SIGN_VERIFY":
1✔
1626
            raise InvalidKeyUsageException(
1✔
1627
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1628
            )
1629

1630
    def _validate_key_for_generate_verify_mac(self, context: RequestContext, key: KmsKey):
1✔
1631
        key_usage = key.metadata["KeyUsage"]
1✔
1632
        if key_usage != "GENERATE_VERIFY_MAC":
1✔
1633
            raise InvalidKeyUsageException(
1✔
1634
                f"{key.metadata['Arn']} key usage is {key_usage} which is not valid for {context.operation.name}."
1635
            )
1636

1637
    def _validate_mac_msg_length(self, msg: bytes):
1✔
1638
        if len(msg) > 4096:
1✔
1639
            raise ValidationException(
×
1640
                "1 validation error detected: Value at 'message' failed to satisfy constraint: "
1641
                "Member must have length less than or equal to 4096"
1642
            )
1643

1644
    def _validate_mac_algorithm(self, key: KmsKey, algorithm: str):
1✔
1645
        if not hasattr(MacAlgorithmSpec, algorithm):
1✔
1646
            raise ValidationException(
1✔
1647
                f"1 validation error detected: Value '{algorithm}' at 'macAlgorithm' "
1648
                f"failed to satisfy constraint: Member must satisfy enum value set: "
1649
                f"[HMAC_SHA_384, HMAC_SHA_256, HMAC_SHA_224, HMAC_SHA_512]"
1650
            )
1651

1652
        key_spec = key.metadata["KeySpec"]
1✔
1653
        if x := algorithm.split("_"):
1✔
1654
            if len(x) == 3 and x[0] + "_" + x[2] != key_spec:
1✔
1655
                raise InvalidKeyUsageException(
1✔
1656
                    f"Algorithm {algorithm} is incompatible with key spec {key_spec}."
1657
                )
1658

1659
    def _validate_plaintext_length(self, plaintext: bytes):
1✔
1660
        if len(plaintext) > 4096:
1✔
1661
            raise ValidationException(
1✔
1662
                "1 validation error detected: Value at 'plaintext' failed to satisfy constraint: "
1663
                "Member must have length less than or equal to 4096"
1664
            )
1665

1666
    def _validate_grant_request(self, data: dict):
1✔
1667
        if "KeyId" not in data or "GranteePrincipal" not in data or "Operations" not in data:
1✔
1668
            raise ValidationError("Grant ID, key ID and grantee principal must be specified")
×
1669

1670
        for operation in data["Operations"]:
1✔
1671
            if operation not in VALID_OPERATIONS:
1✔
1672
                raise ValidationError(
×
1673
                    f"Value {['Operations']} at 'operations' failed to satisfy constraint: Member must satisfy"
1674
                    f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"
1675
                )
1676

1677
    def _extract_attestation_pubkey(self, attestation_document: bytes) -> RSAPublicKey:
1✔
1678
        # The attestation document comes as a COSE (CBOR Object Signing and Encryption) object: the CBOR
1679
        # attestation is signed and then the attestation and signature are again CBOR-encoded.  For now
1680
        # we don't bother validating the signature, though in the future we could.
1681
        cose_document = cbor2_loads(attestation_document)
1✔
1682
        attestation = cbor2_loads(cose_document[2])
1✔
1683
        public_key_bytes = attestation["public_key"]
1✔
1684
        return load_der_public_key(public_key_bytes)
1✔
1685

1686
    def _decrypt_wrapped_key_material(
1✔
1687
        self,
1688
        import_state: KeyImportState,
1689
        encrypted_key_material: CiphertextType,
1690
    ) -> bytes:
1691
        algo = import_state.wrapping_algo
1✔
1692
        decrypt_key = import_state.key.crypto_key.key
1✔
1693

1694
        match algo:
1✔
1695
            case AlgorithmSpec.RSAES_PKCS1_V1_5:
1✔
1696
                padding_scheme = padding.PKCS1v15()
×
1697
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
×
1698
            case AlgorithmSpec.RSAES_OAEP_SHA_1:
1✔
1699
                padding_scheme = padding.OAEP(padding.MGF1(hashes.SHA1()), hashes.SHA1(), None)
1✔
1700
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
1✔
1701
            case AlgorithmSpec.RSAES_OAEP_SHA_256:
1✔
1702
                padding_scheme = padding.OAEP(padding.MGF1(hashes.SHA256()), hashes.SHA256(), None)
1✔
1703
                return decrypt_key.decrypt(encrypted_key_material, padding_scheme)
1✔
1704
            case AlgorithmSpec.RSA_AES_KEY_WRAP_SHA_256:
1✔
1705
                rsa_key_size_bytes = decrypt_key.key_size // 8
1✔
1706
                wrapped_aes_key = encrypted_key_material[:rsa_key_size_bytes]
1✔
1707
                wrapped_key_material = encrypted_key_material[rsa_key_size_bytes:]
1✔
1708

1709
                aes_key = decrypt_key.decrypt(
1✔
1710
                    wrapped_aes_key,
1711
                    padding.OAEP(
1712
                        mgf=padding.MGF1(algorithm=hashes.SHA256()),
1713
                        algorithm=hashes.SHA256(),
1714
                        label=None,
1715
                    ),
1716
                )
1717

1718
                return keywrap.aes_key_unwrap_with_padding(
1✔
1719
                    aes_key, wrapped_key_material, default_backend()
1720
                )
1721

1722
            case _:
×
1723
                raise KMSInvalidStateException(
×
1724
                    f"Unsupported padding, requested wrapping algorithm: '{algo}'"
1725
                )
1726

1727
    def _validate_plaintext_key_type_based(
1✔
1728
        self,
1729
        plaintext: PlaintextType,
1730
        key: KmsKey,
1731
        encryption_algorithm: EncryptionAlgorithmSpec = None,
1732
    ):
1733
        # max size values extracted from AWS boto3 documentation
1734
        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kms/client/encrypt.html
1735
        max_size_bytes = 4096  # max allowed size
1✔
1736
        if (
1✔
1737
            key.metadata["KeySpec"] == KeySpec.RSA_2048
1738
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1739
        ):
1740
            max_size_bytes = 214
1✔
1741
        elif (
1✔
1742
            key.metadata["KeySpec"] == KeySpec.RSA_2048
1743
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1744
        ):
1745
            max_size_bytes = 190
1✔
1746
        elif (
1✔
1747
            key.metadata["KeySpec"] == KeySpec.RSA_3072
1748
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1749
        ):
1750
            max_size_bytes = 342
1✔
1751
        elif (
1✔
1752
            key.metadata["KeySpec"] == KeySpec.RSA_3072
1753
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1754
        ):
1755
            max_size_bytes = 318
1✔
1756
        elif (
1✔
1757
            key.metadata["KeySpec"] == KeySpec.RSA_4096
1758
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_1
1759
        ):
1760
            max_size_bytes = 470
1✔
1761
        elif (
1✔
1762
            key.metadata["KeySpec"] == KeySpec.RSA_4096
1763
            and encryption_algorithm == EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256
1764
        ):
1765
            max_size_bytes = 446
1✔
1766

1767
        if len(plaintext) > max_size_bytes:
1✔
1768
            raise ValidationException(
1✔
1769
                f"Algorithm {encryption_algorithm} and key spec {key.metadata['KeySpec']} cannot encrypt data larger than {max_size_bytes} bytes."
1770
            )
1771

1772

1773
# ---------------
1774
# UTIL FUNCTIONS
1775
# ---------------
1776

1777
# Different AWS services have some internal integrations with KMS. Some create keys, that are used to encrypt/decrypt
1778
# customer's data. Such keys can't be created from outside for security reasons. So AWS services use some internal
1779
# APIs to do that. Functions here are supposed to be used by other LocalStack services to have similar integrations
1780
# with KMS in LocalStack. As such, they are supposed to be proper APIs (as in error and security handling),
1781
# just with more features.
1782

1783

1784
def set_key_managed(key_id: str, account_id: str, region_name: str) -> None:
1✔
1785
    key = KmsProvider._get_kms_key(account_id, region_name, key_id)
1✔
1786
    key.metadata["KeyManager"] = "AWS"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc