• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.57
/localstack-core/localstack/services/secretsmanager/provider.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
import time
1✔
8
from typing import Any, Final
1✔
9

10
import moto.secretsmanager.exceptions as moto_exception
1✔
11
from botocore.utils import InvalidArnException
1✔
12
from moto.iam.policy_validation import IAMPolicyDocumentValidator
1✔
13
from moto.secretsmanager import secretsmanager_backends
1✔
14
from moto.secretsmanager.models import FakeSecret, SecretsManagerBackend
1✔
15
from moto.secretsmanager.responses import SecretsManagerResponse
1✔
16

17
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
18
from localstack.aws.api.secretsmanager import (
1✔
19
    CancelRotateSecretRequest,
20
    CancelRotateSecretResponse,
21
    CreateSecretRequest,
22
    CreateSecretResponse,
23
    DeleteResourcePolicyRequest,
24
    DeleteResourcePolicyResponse,
25
    DeleteSecretRequest,
26
    DeleteSecretResponse,
27
    DescribeSecretRequest,
28
    DescribeSecretResponse,
29
    GetResourcePolicyRequest,
30
    GetResourcePolicyResponse,
31
    GetSecretValueRequest,
32
    GetSecretValueResponse,
33
    InvalidParameterException,
34
    InvalidRequestException,
35
    ListSecretVersionIdsRequest,
36
    ListSecretVersionIdsResponse,
37
    NameType,
38
    PutResourcePolicyRequest,
39
    PutResourcePolicyResponse,
40
    PutSecretValueRequest,
41
    PutSecretValueResponse,
42
    RemoveRegionsFromReplicationRequest,
43
    RemoveRegionsFromReplicationResponse,
44
    ReplicateSecretToRegionsRequest,
45
    ReplicateSecretToRegionsResponse,
46
    ResourceExistsException,
47
    ResourceNotFoundException,
48
    RestoreSecretRequest,
49
    RestoreSecretResponse,
50
    RotateSecretRequest,
51
    RotateSecretResponse,
52
    SecretIdType,
53
    SecretsmanagerApi,
54
    SecretVersionsListEntry,
55
    StopReplicationToReplicaRequest,
56
    StopReplicationToReplicaResponse,
57
    TagResourceRequest,
58
    UntagResourceRequest,
59
    UpdateSecretRequest,
60
    UpdateSecretResponse,
61
    UpdateSecretVersionStageRequest,
62
    UpdateSecretVersionStageResponse,
63
    ValidateResourcePolicyRequest,
64
    ValidateResourcePolicyResponse,
65
)
66
from localstack.aws.connect import connect_to
1✔
67
from localstack.services.moto import call_moto
1✔
68
from localstack.utils.aws import arns
1✔
69
from localstack.utils.patch import patch
1✔
70
from localstack.utils.time import today_no_time
1✔
71

72
# Constants.
73
AWSPREVIOUS: Final[str] = "AWSPREVIOUS"
1✔
74
AWSPENDING: Final[str] = "AWSPENDING"
1✔
75
AWSCURRENT: Final[str] = "AWSCURRENT"
1✔
76
# The maximum number of outdated versions that can be stored in the secret.
77
# see: https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_PutSecretValue.html
78
MAX_OUTDATED_SECRET_VERSIONS: Final[int] = 100
1✔
79
#
80
# Error Messages.
81
AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION: Final[str] = (
1✔
82
    "You can't create this secret because a secret with this name is already scheduled for deletion."
83
)
84

85
LOG = logging.getLogger(__name__)
1✔
86

87

88
class ValidationException(CommonServiceException):
1✔
89
    def __init__(self, message: str):
1✔
90
        super().__init__("ValidationException", message, 400, True)
1✔
91

92

93
class SecretNotFoundException(CommonServiceException):
1✔
94
    def __init__(self):
1✔
95
        super().__init__(
1✔
96
            "ResourceNotFoundException",
97
            "Secrets Manager can't find the specified secret.",
98
            400,
99
            True,
100
        )
101

102

103
class SecretsmanagerProvider(SecretsmanagerApi):
1✔
104
    def __init__(self):
1✔
105
        super().__init__()
1✔
106
        apply_patches()
1✔
107

108
    @staticmethod
1✔
109
    def get_moto_backend_for_resource(
1✔
110
        name_or_arn: str, context: RequestContext
111
    ) -> SecretsManagerBackend:
112
        try:
1✔
113
            arn_data = arns.parse_arn(name_or_arn)
1✔
114
            backend = secretsmanager_backends[arn_data["account"]][arn_data["region"]]
1✔
115
        except InvalidArnException:
1✔
116
            backend = secretsmanager_backends[context.account_id][context.region]
1✔
117
        return backend
1✔
118

119
    @staticmethod
1✔
120
    def _raise_if_default_kms_key(
1✔
121
        secret_id: str, request: RequestContext, backend: SecretsManagerBackend
122
    ):
123
        try:
1✔
124
            secret = backend.describe_secret(secret_id)
1✔
125
        except moto_exception.SecretNotFoundException:
1✔
126
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
1✔
127
        if secret.kms_key_id is None and request.account_id != secret.account_id:
1✔
128
            raise InvalidRequestException(
1✔
129
                "You can't access a secret from a different AWS account if you encrypt the secret with the default KMS service key."
130
            )
131

132
    @staticmethod
1✔
133
    def _validate_secret_id(secret_id: SecretIdType) -> bool:
1✔
134
        # The secret name can contain ASCII letters, numbers, and the following characters: /_+=.@-
135
        return bool(re.match(r"^[A-Za-z0-9/_+=.@-]+\Z", secret_id))
1✔
136

137
    @staticmethod
1✔
138
    def _raise_if_invalid_secret_id(secret_id: SecretIdType | NameType):
1✔
139
        # Patches moto's implementation for which secret_ids are not validated, by raising a ValidationException.
140
        # Skips this check if the secret_id provided appears to be an arn (starting with 'arn:').
141
        if not re.match(
1✔
142
            r"^arn:", secret_id
143
        ):  # Check if it appears to be an arn: so to skip secret_id check: delegate parsing of arn to handlers.
144
            if not SecretsmanagerProvider._validate_secret_id(secret_id):
1✔
145
                raise ValidationException(
1✔
146
                    "Invalid name. Must be a valid name containing alphanumeric "
147
                    "characters, or any of the following: -/_+=.@!"
148
                )
149

150
    @staticmethod
1✔
151
    def _raise_if_missing_client_req_token(
1✔
152
        request: CreateSecretRequest
153
        | PutSecretValueRequest
154
        | RotateSecretRequest
155
        | UpdateSecretRequest,
156
    ):
157
        if "ClientRequestToken" not in request:
1✔
158
            raise InvalidRequestException(
1✔
159
                "You must provide a ClientRequestToken value. We recommend a UUID-type value."
160
            )
161

162
    @handler("CancelRotateSecret", expand=False)
1✔
163
    def cancel_rotate_secret(
1✔
164
        self, context: RequestContext, request: CancelRotateSecretRequest
165
    ) -> CancelRotateSecretResponse:
UNCOV
166
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
167
        return call_moto(context, request)
×
168

169
    @handler("CreateSecret", expand=False)
1✔
170
    def create_secret(
1✔
171
        self, context: RequestContext, request: CreateSecretRequest
172
    ) -> CreateSecretResponse:
173
        self._raise_if_missing_client_req_token(request)
1✔
174
        # Some providers need to create keys which are not usually creatable by users
175
        if not any(
1✔
176
            tag_entry["Key"] == "BYPASS_SECRET_ID_VALIDATION"
177
            for tag_entry in request.get("Tags", [])
178
        ):
179
            self._raise_if_invalid_secret_id(request["Name"])
1✔
180
        else:
181
            request["Tags"] = [
1✔
182
                tag_entry
183
                for tag_entry in request.get("Tags", [])
184
                if tag_entry["Key"] != "BYPASS_SECRET_ID_VALIDATION"
185
            ]
186

187
        return call_moto(context, request)
1✔
188

189
    @handler("DeleteResourcePolicy", expand=False)
1✔
190
    def delete_resource_policy(
1✔
191
        self, context: RequestContext, request: DeleteResourcePolicyRequest
192
    ) -> DeleteResourcePolicyResponse:
193
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
194
        return call_moto(context, request)
1✔
195

196
    @handler("DeleteSecret", expand=False)
1✔
197
    def delete_secret(
1✔
198
        self, context: RequestContext, request: DeleteSecretRequest
199
    ) -> DeleteSecretResponse:
200
        secret_id: str = request["SecretId"]
1✔
201
        self._raise_if_invalid_secret_id(secret_id)
1✔
202
        recovery_window_in_days: int | None = request.get("RecoveryWindowInDays")
1✔
203
        force_delete_without_recovery: bool | None = request.get("ForceDeleteWithoutRecovery")
1✔
204

205
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
206
        try:
1✔
207
            arn, name, deletion_date = backend.delete_secret(
1✔
208
                secret_id=secret_id,
209
                recovery_window_in_days=recovery_window_in_days,
210
                force_delete_without_recovery=force_delete_without_recovery,
211
            )
UNCOV
212
        except moto_exception.InvalidParameterException as e:
×
UNCOV
213
            raise InvalidParameterException(str(e))
×
214
        except moto_exception.InvalidRequestException:
×
215
            raise InvalidRequestException(
×
216
                "You tried to perform the operation on a secret that's currently marked deleted."
217
            )
UNCOV
218
        except moto_exception.SecretNotFoundException:
×
UNCOV
219
            raise SecretNotFoundException()
×
220
        return DeleteSecretResponse(ARN=arn, Name=name, DeletionDate=deletion_date)
1✔
221

222
    @handler("DescribeSecret", expand=False)
1✔
223
    def describe_secret(
1✔
224
        self, context: RequestContext, request: DescribeSecretRequest
225
    ) -> DescribeSecretResponse:
226
        secret_id: str = request["SecretId"]
1✔
227
        self._raise_if_invalid_secret_id(secret_id)
1✔
228
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
229
        try:
1✔
230
            secret = backend.describe_secret(secret_id)
1✔
231
        except moto_exception.SecretNotFoundException:
1✔
232
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
1✔
233
        return DescribeSecretResponse(**secret.to_dict())
1✔
234

235
    @handler("GetResourcePolicy", expand=False)
1✔
236
    def get_resource_policy(
1✔
237
        self, context: RequestContext, request: GetResourcePolicyRequest
238
    ) -> GetResourcePolicyResponse:
239
        secret_id = request["SecretId"]
1✔
240
        self._raise_if_invalid_secret_id(secret_id)
1✔
241
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
242
        policy = backend.get_resource_policy(secret_id)
1✔
243
        return GetResourcePolicyResponse(**json.loads(policy))
1✔
244

245
    @handler("GetSecretValue", expand=False)
1✔
246
    def get_secret_value(
1✔
247
        self, context: RequestContext, request: GetSecretValueRequest
248
    ) -> GetSecretValueResponse:
249
        secret_id = request.get("SecretId")
1✔
250
        version_id = request.get("VersionId")
1✔
251
        version_stage = request.get("VersionStage")
1✔
252
        if not version_id and not version_stage:
1✔
253
            version_stage = "AWSCURRENT"
1✔
254
        self._raise_if_invalid_secret_id(secret_id)
1✔
255
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
256
        self._raise_if_default_kms_key(secret_id, context, backend)
1✔
257
        try:
1✔
258
            response = backend.get_secret_value(secret_id, version_id, version_stage)
1✔
259
            response = decode_secret_binary_from_response(response)
1✔
260
        except moto_exception.SecretNotFoundException:
1✔
261
            raise ResourceNotFoundException(
1✔
262
                f"Secrets Manager can't find the specified secret value for staging label: {version_stage}"
263
            )
264
        except moto_exception.ResourceNotFoundException:
1✔
265
            error_message = (
1✔
266
                f"VersionId: {version_id}" if version_id else f"staging label: {version_stage}"
267
            )
268
            raise ResourceNotFoundException(
1✔
269
                f"Secrets Manager can't find the specified secret value for {error_message}"
270
            )
271
        except moto_exception.SecretStageVersionMismatchException:
1✔
272
            raise InvalidRequestException(
1✔
273
                "You provided a VersionStage that is not associated to the provided VersionId."
274
            )
275
        except moto_exception.SecretHasNoValueException:
1✔
276
            raise ResourceNotFoundException(
1✔
277
                f"Secrets Manager can't find the specified secret value for staging label: {version_stage}"
278
            )
UNCOV
279
        except moto_exception.InvalidRequestException:
×
UNCOV
280
            raise InvalidRequestException(
×
281
                "You can't perform this operation on the secret because it was marked for deletion."
282
            )
283
        return GetSecretValueResponse(**response)
1✔
284

285
    @handler("ListSecretVersionIds", expand=False)
1✔
286
    def list_secret_version_ids(
1✔
287
        self, context: RequestContext, request: ListSecretVersionIdsRequest
288
    ) -> ListSecretVersionIdsResponse:
289
        secret_id = request["SecretId"]
1✔
290
        include_deprecated = request.get("IncludeDeprecated", False)
1✔
291
        self._raise_if_invalid_secret_id(secret_id)
1✔
292
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
293
        secrets = backend.list_secret_version_ids(secret_id, include_deprecated=include_deprecated)
1✔
294
        return ListSecretVersionIdsResponse(**json.loads(secrets))
1✔
295

296
    @handler("PutResourcePolicy", expand=False)
1✔
297
    def put_resource_policy(
1✔
298
        self, context: RequestContext, request: PutResourcePolicyRequest
299
    ) -> PutResourcePolicyResponse:
300
        secret_id = request["SecretId"]
1✔
301
        self._raise_if_invalid_secret_id(secret_id)
1✔
302
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
303
        arn, name = backend.put_resource_policy(secret_id, request["ResourcePolicy"])
1✔
304
        return PutResourcePolicyResponse(ARN=arn, Name=name)
1✔
305

306
    @handler("PutSecretValue", expand=False)
1✔
307
    def put_secret_value(
1✔
308
        self, context: RequestContext, request: PutSecretValueRequest
309
    ) -> PutSecretValueResponse:
310
        secret_id = request["SecretId"]
1✔
311
        self._raise_if_invalid_secret_id(secret_id)
1✔
312
        self._raise_if_missing_client_req_token(request)
1✔
313
        client_req_token = request.get("ClientRequestToken")
1✔
314
        secret_string = request.get("SecretString")
1✔
315
        secret_binary = request.get("SecretBinary")
1✔
316
        if not secret_binary and not secret_string:
1✔
UNCOV
317
            raise InvalidRequestException("You must provide either SecretString or SecretBinary.")
×
318

319
        version_stages = request.get("VersionStages", ["AWSCURRENT"])
1✔
320
        if not isinstance(version_stages, list):
1✔
UNCOV
321
            version_stages = [version_stages]
×
322

323
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
324
        self._raise_if_default_kms_key(secret_id, context, backend)
1✔
325

326
        response = backend.put_secret_value(
1✔
327
            secret_id=secret_id,
328
            secret_binary=secret_binary,
329
            secret_string=secret_string,
330
            version_stages=version_stages,
331
            client_request_token=client_req_token,
332
        )
333
        return PutSecretValueResponse(**json.loads(response))
1✔
334

335
    @handler("RemoveRegionsFromReplication", expand=False)
1✔
336
    def remove_regions_from_replication(
1✔
337
        self, context: RequestContext, request: RemoveRegionsFromReplicationRequest
338
    ) -> RemoveRegionsFromReplicationResponse:
UNCOV
339
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
340
        return call_moto(context, request)
×
341

342
    @handler("ReplicateSecretToRegions", expand=False)
1✔
343
    def replicate_secret_to_regions(
1✔
344
        self, context: RequestContext, request: ReplicateSecretToRegionsRequest
345
    ) -> ReplicateSecretToRegionsResponse:
UNCOV
346
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
347
        return call_moto(context, request)
×
348

349
    @handler("RestoreSecret", expand=False)
1✔
350
    def restore_secret(
1✔
351
        self, context: RequestContext, request: RestoreSecretRequest
352
    ) -> RestoreSecretResponse:
353
        secret_id = request["SecretId"]
1✔
354
        self._raise_if_invalid_secret_id(secret_id)
1✔
355
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
356
        try:
1✔
357
            arn, name = backend.restore_secret(secret_id)
1✔
UNCOV
358
        except moto_exception.SecretNotFoundException:
×
UNCOV
359
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
×
360
        return RestoreSecretResponse(ARN=arn, Name=name)
1✔
361

362
    @handler("RotateSecret", expand=False)
1✔
363
    def rotate_secret(
1✔
364
        self, context: RequestContext, request: RotateSecretRequest
365
    ) -> RotateSecretResponse:
366
        self._raise_if_missing_client_req_token(request)
1✔
367
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
368
        return call_moto(context, request)
1✔
369

370
    @handler("StopReplicationToReplica", expand=False)
1✔
371
    def stop_replication_to_replica(
1✔
372
        self, context: RequestContext, request: StopReplicationToReplicaRequest
373
    ) -> StopReplicationToReplicaResponse:
UNCOV
374
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
375
        return call_moto(context, request)
×
376

377
    @handler("TagResource", expand=False)
1✔
378
    def tag_resource(self, context: RequestContext, request: TagResourceRequest) -> None:
1✔
379
        secret_id = request["SecretId"]
1✔
380
        tags = request["Tags"]
1✔
381
        self._raise_if_invalid_secret_id(secret_id)
1✔
382
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
383
        backend.tag_resource(secret_id, tags)
1✔
384

385
    @handler("UntagResource", expand=False)
1✔
386
    def untag_resource(self, context: RequestContext, request: UntagResourceRequest) -> None:
1✔
387
        secret_id = request["SecretId"]
1✔
388
        tag_keys = request.get("TagKeys")
1✔
389
        self._raise_if_invalid_secret_id(secret_id)
1✔
390
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
391
        backend.untag_resource(secret_id=secret_id, tag_keys=tag_keys)
1✔
392

393
    @handler("UpdateSecret", expand=False)
1✔
394
    def update_secret(
1✔
395
        self, context: RequestContext, request: UpdateSecretRequest
396
    ) -> UpdateSecretResponse:
397
        # if we're modifying the value of the secret, ClientRequestToken is required
398
        secret_id = request["SecretId"]
1✔
399
        secret_string = request.get("SecretString")
1✔
400
        secret_binary = request.get("SecretBinary")
1✔
401
        description = request.get("Description")
1✔
402
        kms_key_id = request.get("KmsKeyId")
1✔
403
        client_req_token = request.get("ClientRequestToken")
1✔
404
        self._raise_if_invalid_secret_id(secret_id)
1✔
405
        self._raise_if_missing_client_req_token(request)
1✔
406

407
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
408
        try:
1✔
409
            secret = backend.update_secret(
1✔
410
                secret_id,
411
                description=description,
412
                secret_string=secret_string,
413
                secret_binary=secret_binary,
414
                client_request_token=client_req_token,
415
                kms_key_id=kms_key_id,
416
            )
UNCOV
417
        except moto_exception.SecretNotFoundException:
×
UNCOV
418
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
×
419
        except moto_exception.OperationNotPermittedOnReplica:
×
420
            raise InvalidRequestException(
×
421
                "Operation not permitted on a replica secret. Call must be made in primary secret's region."
422
            )
UNCOV
423
        except moto_exception.InvalidRequestException:
×
UNCOV
424
            raise InvalidRequestException(
×
425
                "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
426
                "You can't perform this operation on the secret because it was marked for deletion."
427
            )
428
        return UpdateSecretResponse(**json.loads(secret))
1✔
429

430
    @handler("UpdateSecretVersionStage", expand=False)
1✔
431
    def update_secret_version_stage(
1✔
432
        self, context: RequestContext, request: UpdateSecretVersionStageRequest
433
    ) -> UpdateSecretVersionStageResponse:
434
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
435
        return call_moto(context, request)
1✔
436

437
    @handler("ValidateResourcePolicy", expand=False)
1✔
438
    def validate_resource_policy(
1✔
439
        self, context: RequestContext, request: ValidateResourcePolicyRequest
440
    ) -> ValidateResourcePolicyResponse:
441
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
UNCOV
442
        return call_moto(context, request)
×
443

444

445
@patch(FakeSecret.__init__)
1✔
446
def fake_secret__init__(fn, self, *args, **kwargs):
1✔
447
    fn(self, *args, **kwargs)
1✔
448

449
    # Fix time not including millis.
450
    time_now = time.time()
1✔
451
    if kwargs.get("last_changed_date", None):
1✔
452
        self.last_changed_date = time_now
1✔
453
    if kwargs.get("created_date", None):
1✔
454
        self.created_date = time_now
1✔
455

456
    # The last date that the secret value was retrieved.
457
    # This value does not include the time.
458
    # This field is omitted if the secret has never been retrieved.
459
    self.last_accessed_date = None
1✔
460
    # Results in RotationEnabled being returned only if rotation was ever overwritten,
461
    # in which case this field is non-null, but an integer.
462
    self.auto_rotate_after_days = None
1✔
463
    self.rotation_lambda_arn = None
1✔
464

465

466
@patch(FakeSecret.update)
1✔
467
def fake_secret_update(
1✔
468
    fn, self, description=None, tags=None, kms_key_id=None, last_changed_date=None
469
):
470
    fn(self, description, tags, kms_key_id, last_changed_date)
1✔
471
    if last_changed_date is not None:
1✔
472
        self.last_changed_date = round(time.time(), 3)
1✔
473

474

475
@patch(SecretsManagerBackend.get_secret_value)
1✔
476
def moto_smb_get_secret_value(fn, self, secret_id, version_id, version_stage):
1✔
477
    res = fn(self, secret_id, version_id, version_stage)
1✔
478

479
    secret = self.secrets[secret_id]
1✔
480

481
    # Patch: update last accessed date on get.
482
    secret.last_accessed_date = today_no_time()
1✔
483

484
    # Patch: update version's last accessed date.
485
    secret_version = secret.versions.get(version_id or secret.default_version_id)
1✔
486
    if secret_version:
1✔
487
        secret_version["last_accessed_date"] = secret.last_accessed_date
1✔
488

489
    return res
1✔
490

491

492
@patch(SecretsManagerBackend.create_secret)
1✔
493
def moto_smb_create_secret(fn, self, name, *args, **kwargs):
1✔
494
    # Creating a secret with a SecretId equal to one that is scheduled for
495
    # deletion should raise an 'InvalidRequestException'.
496
    secret: FakeSecret | None = self.secrets.get(name)
1✔
497
    if secret is not None and secret.deleted_date is not None:
1✔
498
        raise InvalidRequestException(AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION)
1✔
499

500
    if name in self.secrets:
1✔
501
        raise ResourceExistsException(
1✔
502
            f"The operation failed because the secret {name} already exists."
503
        )
504

505
    return fn(self, name, *args, **kwargs)
1✔
506

507

508
@patch(SecretsManagerBackend.list_secret_version_ids)
1✔
509
def moto_smb_list_secret_version_ids(
1✔
510
    _, self, secret_id: str, include_deprecated: bool, *args, **kwargs
511
):
512
    if secret_id not in self.secrets:
1✔
513
        raise SecretNotFoundException()
1✔
514

515
    if self.secrets[secret_id].is_deleted():
1✔
UNCOV
516
        raise InvalidRequestException(
×
517
            "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
518
            "You can't perform this operation on the secret because it was marked for deletion."
519
        )
520

521
    secret = self.secrets[secret_id]
1✔
522

523
    # Patch: output format, report exact createdate instead of current time.
524
    versions: list[SecretVersionsListEntry] = list()
1✔
525
    for version_id, version in secret.versions.items():
1✔
526
        version_stages = version["version_stages"]
1✔
527
        # Patch: include deprecated versions if include_deprecated is True.
528
        # version_stages is empty if the version is deprecated.
529
        # see: https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html#term_version
530
        if len(version_stages) > 0 or include_deprecated:
1✔
531
            entry = SecretVersionsListEntry(
1✔
532
                CreatedDate=version["createdate"],
533
                VersionId=version_id,
534
            )
535

536
            if version_stages:
1✔
537
                entry["VersionStages"] = version_stages
1✔
538

539
            # Patch: bind LastAccessedDate if one exists for this version.
540
            last_accessed_date = version.get("last_accessed_date")
1✔
541
            if last_accessed_date:
1✔
542
                entry["LastAccessedDate"] = last_accessed_date
1✔
543

544
            versions.append(entry)
1✔
545

546
    # Patch: sort versions by date.
547
    versions.sort(key=lambda v: v["CreatedDate"], reverse=True)
1✔
548

549
    response = ListSecretVersionIdsResponse(ARN=secret.arn, Name=secret.name, Versions=versions)
1✔
550

551
    return json.dumps(response)
1✔
552

553

554
@patch(FakeSecret.to_dict)
1✔
555
def fake_secret_to_dict(fn, self):
1✔
556
    res_dict = fn(self)
1✔
557
    if self.last_accessed_date:
1✔
558
        res_dict["LastAccessedDate"] = self.last_accessed_date
1✔
559
    if not self.description and "Description" in res_dict:
1✔
UNCOV
560
        del res_dict["Description"]
×
561
    if not self.rotation_enabled and "RotationEnabled" in res_dict:
1✔
562
        del res_dict["RotationEnabled"]
×
563
    if self.auto_rotate_after_days is None and "RotationRules" in res_dict:
1✔
564
        del res_dict["RotationRules"]
×
565
    if self.tags is None and "Tags" in res_dict:
1✔
566
        del res_dict["Tags"]
×
567
    for null_field in [key for key, value in res_dict.items() if value is None]:
1✔
568
        del res_dict[null_field]
1✔
569
    return res_dict
1✔
570

571

572
@patch(SecretsManagerBackend.update_secret)
1✔
573
def backend_update_secret(
1✔
574
    fn,
575
    self,
576
    secret_id,
577
    description=None,
578
    **kwargs,
579
):
580
    if secret_id not in self.secrets:
1✔
UNCOV
581
        raise SecretNotFoundException()
×
582

583
    if self.secrets[secret_id].is_deleted():
1✔
UNCOV
584
        raise InvalidRequestException(
×
585
            "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
586
            "You can't perform this operation on the secret because it was marked for deletion."
587
        )
588

589
    secret = self.secrets[secret_id]
1✔
590
    version_id_t0 = secret.default_version_id
1✔
591

592
    requires_new_version: bool = any(
1✔
593
        [kwargs.get("kms_key_id"), kwargs.get("secret_binary"), kwargs.get("secret_string")]
594
    )
595
    if requires_new_version:
1✔
596
        fn(self, secret_id, **kwargs)
1✔
597

598
    if description is not None:
1✔
599
        secret.description = description
1✔
600

601
    version_id_t1 = secret.default_version_id
1✔
602

603
    resp: UpdateSecretResponse = UpdateSecretResponse()
1✔
604
    resp["ARN"] = secret.arn
1✔
605
    resp["Name"] = secret.name
1✔
606

607
    if version_id_t0 != version_id_t1:
1✔
608
        resp["VersionId"] = version_id_t1
1✔
609

610
    return json.dumps(resp)
1✔
611

612

613
@patch(SecretsManagerResponse.update_secret, pass_target=False)
1✔
614
def response_update_secret(self):
1✔
UNCOV
615
    secret_id = self._get_param("SecretId")
×
UNCOV
616
    description = self._get_param("Description")
×
617
    secret_string = self._get_param("SecretString")
×
618
    secret_binary = self._get_param("SecretBinary")
×
619
    client_request_token = self._get_param("ClientRequestToken")
×
620
    kms_key_id = self._get_param("KmsKeyId")
×
621
    return self.backend.update_secret(
×
622
        secret_id=secret_id,
623
        description=description,
624
        secret_string=secret_string,
625
        secret_binary=secret_binary,
626
        client_request_token=client_request_token,
627
        kms_key_id=kms_key_id,
628
    )
629

630

631
@patch(SecretsManagerBackend.update_secret_version_stage)
1✔
632
def backend_update_secret_version_stage(
1✔
633
    fn, self, secret_id, version_stage, remove_from_version_id, move_to_version_id
634
):
635
    fn(self, secret_id, version_stage, remove_from_version_id, move_to_version_id)
1✔
636

637
    secret = self.secrets[secret_id]
1✔
638

639
    # Patch: default version is the new AWSCURRENT version
640
    if version_stage == AWSCURRENT:
1✔
641
        secret.default_version_id = move_to_version_id
1✔
642

643
    versions_no_stages = []
1✔
644
    for version_id, version in secret.versions.items():
1✔
645
        version_stages = version["version_stages"]
1✔
646

647
        # moto appends a new AWSPREVIOUS label to the version AWSCURRENT was removed from,
648
        # but it does not remove the old AWSPREVIOUS label.
649
        # Patch: ensure only one AWSPREVIOUS tagged version is in the pool.
650
        if (
1✔
651
            version_stage == AWSCURRENT
652
            and version_id != remove_from_version_id
653
            and AWSPREVIOUS in version_stages
654
        ):
UNCOV
655
            version_stages.remove(AWSPREVIOUS)
×
656

657
        if not version_stages:
1✔
658
            versions_no_stages.append(version_id)
1✔
659

660
    # Patch: remove secret versions with no version stages.
661
    for version_no_stages in versions_no_stages:
1✔
662
        del secret.versions[version_no_stages]
1✔
663

664
    return secret.arn, secret.name
1✔
665

666

667
@patch(FakeSecret.reset_default_version)
1✔
668
def fake_secret_reset_default_version(fn, self, secret_version, version_id):
1✔
669
    fn(self, secret_version, version_id)
1✔
670

671
    # Remove versions with no version stages, if max limit of outdated versions is exceeded.
672
    versions_no_stages: list[str] = [
1✔
673
        version_id for version_id, version in self.versions.items() if not version["version_stages"]
674
    ]
675
    versions_to_delete: list[str] = []
1✔
676

677
    # Patch: remove outdated versions if the max deprecated versions limit is exceeded.
678
    if len(versions_no_stages) >= MAX_OUTDATED_SECRET_VERSIONS:
1✔
679
        versions_to_delete = versions_no_stages[
1✔
680
            : len(versions_no_stages) - MAX_OUTDATED_SECRET_VERSIONS
681
        ]
682

683
    for version_to_delete in versions_to_delete:
1✔
684
        del self.versions[version_to_delete]
1✔
685

686

687
@patch(FakeSecret.remove_version_stages_from_old_versions)
1✔
688
def fake_secret_remove_version_stages_from_old_versions(fn, self, version_stages):
1✔
689
    fn(self, version_stages)
1✔
690
    # Remove versions with no version stages.
691
    versions_no_stages = [
1✔
692
        version_id for version_id, version in self.versions.items() if not version["version_stages"]
693
    ]
694
    for version_no_stages in versions_no_stages:
1✔
695
        del self.versions[version_no_stages]
1✔
696

697

698
# Moto does not support rotate_immediately as an API parameter while the AWS API does
699
@patch(SecretsManagerResponse.rotate_secret, pass_target=False)
1✔
700
def rotate_secret(self) -> str:
1✔
701
    client_request_token = self._get_param("ClientRequestToken")
1✔
702
    rotation_lambda_arn = self._get_param("RotationLambdaARN")
1✔
703
    rotation_rules = self._get_param("RotationRules")
1✔
704
    rotate_immediately = self._get_param("RotateImmediately")
1✔
705
    secret_id = self._get_param("SecretId")
1✔
706
    return self.backend.rotate_secret(
1✔
707
        secret_id=secret_id,
708
        client_request_token=client_request_token,
709
        rotation_lambda_arn=rotation_lambda_arn,
710
        rotation_rules=rotation_rules,
711
        rotate_immediately=True if rotate_immediately is None else rotate_immediately,
712
    )
713

714

715
@patch(SecretsManagerBackend.rotate_secret)
1✔
716
def backend_rotate_secret(
1✔
717
    _,
718
    self,
719
    secret_id,
720
    client_request_token=None,
721
    rotation_lambda_arn=None,
722
    rotation_rules=None,
723
    rotate_immediately=True,
724
):
725
    rotation_days = "AutomaticallyAfterDays"
1✔
726

727
    if not self._is_valid_identifier(secret_id):
1✔
UNCOV
728
        raise SecretNotFoundException()
×
729

730
    secret = self.secrets[secret_id]
1✔
731
    if secret.is_deleted():
1✔
UNCOV
732
        raise InvalidRequestException(
×
733
            "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
734
            perform the operation on a secret that's currently marked deleted."
735
        )
736
    # Resolve rotation_lambda_arn and fallback to previous value if its missing
737
    # from the current request
738
    rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn
1✔
739
    if not rotation_lambda_arn:
1✔
740
        raise InvalidRequestException(
1✔
741
            "No Lambda rotation function ARN is associated with this secret."
742
        )
743

744
    if rotation_lambda_arn:
1✔
745
        if len(rotation_lambda_arn) > 2048:
1✔
UNCOV
746
            msg = "RotationLambdaARN must <= 2048 characters long."
×
UNCOV
747
            raise InvalidParameterException(msg)
×
748

749
    # In case rotation_period is not provided, resolve auto_rotate_after_days
750
    # and fallback to previous value if its missing from the current request.
751
    rotation_period = secret.auto_rotate_after_days or 0
1✔
752
    if rotation_rules:
1✔
753
        if rotation_days in rotation_rules:
1✔
754
            rotation_period = rotation_rules[rotation_days]
1✔
755
            if rotation_period < 1 or rotation_period > 1000:
1✔
UNCOV
756
                msg = "RotationRules.AutomaticallyAfterDays must be within 1-1000."
×
UNCOV
757
                raise InvalidParameterException(msg)
×
758

759
    try:
1✔
760
        lm_client = connect_to(region_name=self.region_name).lambda_
1✔
761
        lm_client.get_function(FunctionName=rotation_lambda_arn)
1✔
762
    except Exception:
1✔
763
        raise ResourceNotFoundException("Lambda does not exist or could not be accessed")
1✔
764

765
    # The rotation function must end with the versions of the secret in
766
    # one of two states:
767
    #
768
    #  - The AWSPENDING and AWSCURRENT staging labels are attached to the
769
    #    same version of the secret, or
770
    #  - The AWSPENDING staging label is not attached to any version of the secret.
771
    #
772
    # If the AWSPENDING staging label is present but not attached to the same
773
    # version as AWSCURRENT then any later invocation of RotateSecret assumes
774
    # that a previous rotation request is still in progress and returns an error.
775
    try:
1✔
776
        pending_version = None
1✔
777
        version = next(
1✔
778
            version
779
            for version in secret.versions.values()
780
            if AWSPENDING in version["version_stages"]
781
        )
UNCOV
782
        if AWSCURRENT not in version["version_stages"]:
×
UNCOV
783
            msg = "Previous rotation request is still in progress."
×
784
            # Delay exception, so we can trigger lambda again
785
            pending_version = [InvalidRequestException(msg), version]
×
786

787
    except StopIteration:
1✔
788
        # Pending is not present in any version
789
        pass
1✔
790

791
    secret.rotation_lambda_arn = rotation_lambda_arn
1✔
792
    secret.auto_rotate_after_days = rotation_period
1✔
793
    if secret.auto_rotate_after_days > 0:
1✔
794
        wait_interval_s = int(rotation_period) * 86400
1✔
795
        secret.next_rotation_date = int(time.time()) + wait_interval_s
1✔
796
        secret.rotation_enabled = True
1✔
797
        secret.rotation_requested = True
1✔
798

799
    if rotate_immediately:
1✔
800
        if not pending_version:
1✔
801
            # Begin the rotation process for the given secret by invoking the lambda function.
802
            #
803
            # We add the new secret version as "pending". The previous version remains
804
            # as "current" for now. Once we've passed the new secret through the lambda
805
            # rotation function (if provided) we can then update the status to "current".
806
            new_version_id = self._from_client_request_token(client_request_token)
1✔
807

808
            # An initial dummy secret value is necessary otherwise moto is not adding the new
809
            # secret version.
810
            self._add_secret(
1✔
811
                secret_id,
812
                "dummy_password",
813
                description=secret.description,
814
                tags=secret.tags,
815
                version_id=new_version_id,
816
                version_stages=[AWSPENDING],
817
            )
818

819
            # AWS secret rotation function templates have checks on existing values so we remove
820
            # the dummy value to force the lambda to generate a new one.
821
            del secret.versions[new_version_id]["secret_string"]
1✔
822
        else:
UNCOV
823
            new_version_id = pending_version.pop()["version_id"]
×
824

825
        try:
1✔
826
            for step in ["create", "set", "test", "finish"]:
1✔
827
                resp = lm_client.invoke(
1✔
828
                    FunctionName=rotation_lambda_arn,
829
                    Payload=json.dumps(
830
                        {
831
                            "Step": step + "Secret",
832
                            "SecretId": secret.name,
833
                            "ClientRequestToken": new_version_id,
834
                        }
835
                    ),
836
                )
837
                if resp.get("FunctionError"):
1✔
UNCOV
838
                    data = json.loads(resp.get("Payload").read())
×
UNCOV
839
                    raise Exception(data.get("errorType"))
×
840
        except Exception as e:
×
841
            LOG.debug("An exception (%s) has occurred in %s", str(e), rotation_lambda_arn)
×
842
            if pending_version:
×
843
                raise pending_version.pop()
×
844
            # Fall through if there is no previously pending version so we'll "stuck" with a new
845
            # secret version in AWSPENDING state.
846
    secret.last_rotation_date = int(time.time())
1✔
847
    return secret.to_short_dict(version_id=new_version_id)
1✔
848

849

850
@patch(moto_exception.SecretNotFoundException.__init__)
1✔
851
def moto_secret_not_found_exception_init(fn, self):
1✔
852
    fn(self)
1✔
853
    self.code = 400
1✔
854

855

856
@patch(FakeSecret._form_version_ids_to_stages, pass_target=False)
1✔
857
def _form_version_ids_to_stages_modal(self):
1✔
858
    version_id_to_stages: dict[str, list] = {}
1✔
859
    for key, value in self.versions.items():
1✔
860
        # Patch: include version_stages in the response only if it is not empty.
861
        if len(value["version_stages"]) > 0:
1✔
862
            version_id_to_stages[key] = value["version_stages"]
1✔
863
    return version_id_to_stages
1✔
864

865

866
# patching resource policy in moto
867
def get_resource_policy_model(self, secret_id):
1✔
868
    if self._is_valid_identifier(secret_id):
1✔
869
        result = {
1✔
870
            "ARN": self.secrets[secret_id].arn,
871
            "Name": self.secrets[secret_id].secret_id,
872
        }
873
        policy = getattr(self.secrets[secret_id], "policy", None)
1✔
874
        if policy:
1✔
875
            result["ResourcePolicy"] = policy
1✔
876
        return json.dumps(result)
1✔
877
    else:
UNCOV
878
        raise SecretNotFoundException()
×
879

880

881
def get_resource_policy_response(self):
1✔
UNCOV
882
    secret_id = self._get_param("SecretId")
×
UNCOV
883
    return self.backend.get_resource_policy(secret_id=secret_id)
×
884

885

886
def decode_secret_binary_from_response(response: dict[str, Any]):
1✔
887
    if "SecretBinary" in response:
1✔
888
        response["SecretBinary"] = base64.b64decode(response["SecretBinary"])
1✔
889

890
    return response
1✔
891

892

893
def delete_resource_policy_model(self, secret_id):
1✔
UNCOV
894
    if self._is_valid_identifier(secret_id):
×
UNCOV
895
        self.secrets[secret_id].policy = None
×
896
        return json.dumps(
×
897
            {
898
                "ARN": self.secrets[secret_id].arn,
899
                "Name": self.secrets[secret_id].secret_id,
900
            }
901
        )
902
    else:
UNCOV
903
        raise SecretNotFoundException()
×
904

905

906
def delete_resource_policy_response(self):
1✔
UNCOV
907
    secret_id = self._get_param("SecretId")
×
UNCOV
908
    return self.backend.delete_resource_policy(secret_id=secret_id)
×
909

910

911
def put_resource_policy_model(self, secret_id, resource_policy):
1✔
UNCOV
912
    policy_validator = IAMPolicyDocumentValidator(resource_policy)
×
UNCOV
913
    policy_validator._validate_top_elements()
×
914
    policy_validator._validate_version_syntax()
×
915
    if self._is_valid_identifier(secret_id):
×
916
        self.secrets[secret_id].policy = resource_policy
×
917
        return json.dumps(
×
918
            {
919
                "ARN": self.secrets[secret_id].arn,
920
                "Name": self.secrets[secret_id].secret_id,
921
            }
922
        )
923
    else:
UNCOV
924
        raise SecretNotFoundException()
×
925

926

927
def put_resource_policy_response(self):
1✔
UNCOV
928
    secret_id = self._get_param("SecretId")
×
UNCOV
929
    resource_policy = self._get_param("ResourcePolicy")
×
930
    return self.backend.put_resource_policy(
×
931
        secret_id=secret_id, resource_policy=json.loads(resource_policy)
932
    )
933

934

935
def apply_patches():
1✔
936
    SecretsManagerBackend.get_resource_policy = get_resource_policy_model
1✔
937
    SecretsManagerResponse.get_resource_policy = get_resource_policy_response
1✔
938

939
    if not hasattr(SecretsManagerBackend, "delete_resource_policy"):
1✔
UNCOV
940
        SecretsManagerBackend.delete_resource_policy = delete_resource_policy_model
×
941
    if not hasattr(SecretsManagerResponse, "delete_resource_policy"):
1✔
942
        SecretsManagerResponse.delete_resource_policy = delete_resource_policy_response
×
943
    if not hasattr(SecretsManagerBackend, "put_resource_policy"):
1✔
944
        SecretsManagerBackend.put_resource_policy = put_resource_policy_model
×
945
    if not hasattr(SecretsManagerResponse, "put_resource_policy"):
1✔
946
        SecretsManagerResponse.put_resource_policy = put_resource_policy_response
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc