• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21708052316

05 Feb 2026 10:30AM UTC coverage: 86.873% (-0.09%) from 86.962%
21708052316

push

github

web-flow
Deprecate TaggingService (#13697)

2 of 2 new or added lines in 1 file covered. (100.0%)

158 existing lines in 15 files now uncovered.

69932 of 80499 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.53
/localstack-core/localstack/services/secretsmanager/provider.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
import time
1✔
8
from typing import Any, Final
1✔
9

10
import moto.secretsmanager.exceptions as moto_exception
1✔
11
from botocore.utils import InvalidArnException
1✔
12
from moto.iam.policy_validation import IAMPolicyDocumentValidator
1✔
13
from moto.secretsmanager import secretsmanager_backends
1✔
14
from moto.secretsmanager.models import FakeSecret, SecretsManagerBackend
1✔
15
from moto.secretsmanager.responses import SecretsManagerResponse
1✔
16

17
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
18
from localstack.aws.api.secretsmanager import (
1✔
19
    CancelRotateSecretRequest,
20
    CancelRotateSecretResponse,
21
    CreateSecretRequest,
22
    CreateSecretResponse,
23
    DeleteResourcePolicyRequest,
24
    DeleteResourcePolicyResponse,
25
    DeleteSecretRequest,
26
    DeleteSecretResponse,
27
    DescribeSecretRequest,
28
    DescribeSecretResponse,
29
    GetResourcePolicyRequest,
30
    GetResourcePolicyResponse,
31
    GetSecretValueRequest,
32
    GetSecretValueResponse,
33
    InvalidParameterException,
34
    InvalidRequestException,
35
    ListSecretVersionIdsRequest,
36
    ListSecretVersionIdsResponse,
37
    NameType,
38
    PutResourcePolicyRequest,
39
    PutResourcePolicyResponse,
40
    PutSecretValueRequest,
41
    PutSecretValueResponse,
42
    RemoveRegionsFromReplicationRequest,
43
    RemoveRegionsFromReplicationResponse,
44
    ReplicateSecretToRegionsRequest,
45
    ReplicateSecretToRegionsResponse,
46
    ResourceExistsException,
47
    ResourceNotFoundException,
48
    RestoreSecretRequest,
49
    RestoreSecretResponse,
50
    RotateSecretRequest,
51
    RotateSecretResponse,
52
    SecretIdType,
53
    SecretsmanagerApi,
54
    SecretVersionsListEntry,
55
    StopReplicationToReplicaRequest,
56
    StopReplicationToReplicaResponse,
57
    TagResourceRequest,
58
    UntagResourceRequest,
59
    UpdateSecretRequest,
60
    UpdateSecretResponse,
61
    UpdateSecretVersionStageRequest,
62
    UpdateSecretVersionStageResponse,
63
    ValidateResourcePolicyRequest,
64
    ValidateResourcePolicyResponse,
65
)
66
from localstack.aws.connect import connect_to
1✔
67
from localstack.services.moto import call_moto
1✔
68
from localstack.state import StateVisitor
1✔
69
from localstack.utils.aws import arns
1✔
70
from localstack.utils.patch import patch
1✔
71
from localstack.utils.time import today_no_time
1✔
72

73
# Constants.
74
AWSPREVIOUS: Final[str] = "AWSPREVIOUS"
1✔
75
AWSPENDING: Final[str] = "AWSPENDING"
1✔
76
AWSCURRENT: Final[str] = "AWSCURRENT"
1✔
77
# The maximum number of outdated versions that can be stored in the secret.
78
# see: https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_PutSecretValue.html
79
MAX_OUTDATED_SECRET_VERSIONS: Final[int] = 100
1✔
80
#
81
# Error Messages.
82
AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION: Final[str] = (
1✔
83
    "You can't create this secret because a secret with this name is already scheduled for deletion."
84
)
85

86
LOG = logging.getLogger(__name__)
1✔
87

88

89
class ValidationException(CommonServiceException):
1✔
90
    def __init__(self, message: str):
1✔
91
        super().__init__("ValidationException", message, 400, True)
1✔
92

93

94
class SecretNotFoundException(CommonServiceException):
1✔
95
    def __init__(self):
1✔
96
        super().__init__(
1✔
97
            "ResourceNotFoundException",
98
            "Secrets Manager can't find the specified secret.",
99
            400,
100
            True,
101
        )
102

103

104
class SecretsmanagerProvider(SecretsmanagerApi):
1✔
105
    def __init__(self):
1✔
106
        super().__init__()
1✔
107
        apply_patches()
1✔
108

109
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
110
        visitor.visit(secretsmanager_backends)
×
111

112
    @staticmethod
1✔
113
    def get_moto_backend_for_resource(
1✔
114
        name_or_arn: str, context: RequestContext
115
    ) -> SecretsManagerBackend:
116
        try:
1✔
117
            arn_data = arns.parse_arn(name_or_arn)
1✔
118
            backend = secretsmanager_backends[arn_data["account"]][arn_data["region"]]
1✔
119
        except InvalidArnException:
1✔
120
            backend = secretsmanager_backends[context.account_id][context.region]
1✔
121
        return backend
1✔
122

123
    @staticmethod
1✔
124
    def _raise_if_default_kms_key(
1✔
125
        secret_id: str, request: RequestContext, backend: SecretsManagerBackend
126
    ):
127
        try:
1✔
128
            secret = backend.describe_secret(secret_id)
1✔
129
        except moto_exception.SecretNotFoundException:
1✔
130
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
1✔
131
        if secret.kms_key_id is None and request.account_id != secret.account_id:
1✔
132
            raise InvalidRequestException(
1✔
133
                "You can't access a secret from a different AWS account if you encrypt the secret with the default KMS service key."
134
            )
135

136
    @staticmethod
1✔
137
    def _validate_secret_id(secret_id: SecretIdType) -> bool:
1✔
138
        # The secret name can contain ASCII letters, numbers, and the following characters: /_+=.@-
139
        return bool(re.match(r"^[A-Za-z0-9/_+=.@-]+\Z", secret_id))
1✔
140

141
    @staticmethod
1✔
142
    def _raise_if_invalid_secret_id(secret_id: SecretIdType | NameType):
1✔
143
        # Patches moto's implementation for which secret_ids are not validated, by raising a ValidationException.
144
        # Skips this check if the secret_id provided appears to be an arn (starting with 'arn:').
145
        if not re.match(
1✔
146
            r"^arn:", secret_id
147
        ):  # Check if it appears to be an arn: so to skip secret_id check: delegate parsing of arn to handlers.
148
            if not SecretsmanagerProvider._validate_secret_id(secret_id):
1✔
149
                raise ValidationException(
1✔
150
                    "Invalid name. Must be a valid name containing alphanumeric "
151
                    "characters, or any of the following: -/_+=.@!"
152
                )
153

154
    @staticmethod
1✔
155
    def _raise_if_missing_client_req_token(
1✔
156
        request: CreateSecretRequest
157
        | PutSecretValueRequest
158
        | RotateSecretRequest
159
        | UpdateSecretRequest,
160
    ):
161
        if "ClientRequestToken" not in request:
1✔
162
            raise InvalidRequestException(
1✔
163
                "You must provide a ClientRequestToken value. We recommend a UUID-type value."
164
            )
165

166
    @handler("CancelRotateSecret", expand=False)
1✔
167
    def cancel_rotate_secret(
1✔
168
        self, context: RequestContext, request: CancelRotateSecretRequest
169
    ) -> CancelRotateSecretResponse:
170
        self._raise_if_invalid_secret_id(request["SecretId"])
×
171
        return call_moto(context, request)
×
172

173
    @handler("CreateSecret", expand=False)
1✔
174
    def create_secret(
1✔
175
        self, context: RequestContext, request: CreateSecretRequest
176
    ) -> CreateSecretResponse:
177
        self._raise_if_missing_client_req_token(request)
1✔
178
        # Some providers need to create keys which are not usually creatable by users
179
        if not any(
1✔
180
            tag_entry["Key"] == "BYPASS_SECRET_ID_VALIDATION"
181
            for tag_entry in request.get("Tags", [])
182
        ):
183
            self._raise_if_invalid_secret_id(request["Name"])
1✔
184
        else:
185
            request["Tags"] = [
1✔
186
                tag_entry
187
                for tag_entry in request.get("Tags", [])
188
                if tag_entry["Key"] != "BYPASS_SECRET_ID_VALIDATION"
189
            ]
190

191
        return call_moto(context, request)
1✔
192

193
    @handler("DeleteResourcePolicy", expand=False)
1✔
194
    def delete_resource_policy(
1✔
195
        self, context: RequestContext, request: DeleteResourcePolicyRequest
196
    ) -> DeleteResourcePolicyResponse:
197
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
198
        return call_moto(context, request)
1✔
199

200
    @handler("DeleteSecret", expand=False)
1✔
201
    def delete_secret(
1✔
202
        self, context: RequestContext, request: DeleteSecretRequest
203
    ) -> DeleteSecretResponse:
204
        secret_id: str = request["SecretId"]
1✔
205
        self._raise_if_invalid_secret_id(secret_id)
1✔
206
        recovery_window_in_days: int | None = request.get("RecoveryWindowInDays")
1✔
207
        force_delete_without_recovery: bool | None = request.get("ForceDeleteWithoutRecovery")
1✔
208

209
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
210
        try:
1✔
211
            arn, name, deletion_date = backend.delete_secret(
1✔
212
                secret_id=secret_id,
213
                recovery_window_in_days=recovery_window_in_days,
214
                force_delete_without_recovery=force_delete_without_recovery,
215
            )
216
        except moto_exception.InvalidParameterException as e:
×
217
            raise InvalidParameterException(str(e))
×
218
        except moto_exception.InvalidRequestException:
×
219
            raise InvalidRequestException(
×
220
                "You tried to perform the operation on a secret that's currently marked deleted."
221
            )
222
        except moto_exception.SecretNotFoundException:
×
223
            raise SecretNotFoundException()
×
224
        return DeleteSecretResponse(ARN=arn, Name=name, DeletionDate=deletion_date)
1✔
225

226
    @handler("DescribeSecret", expand=False)
1✔
227
    def describe_secret(
1✔
228
        self, context: RequestContext, request: DescribeSecretRequest
229
    ) -> DescribeSecretResponse:
230
        secret_id: str = request["SecretId"]
1✔
231
        self._raise_if_invalid_secret_id(secret_id)
1✔
232
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
233
        try:
1✔
234
            secret = backend.describe_secret(secret_id)
1✔
235
        except moto_exception.SecretNotFoundException:
1✔
236
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
1✔
237
        return DescribeSecretResponse(**secret.to_dict())
1✔
238

239
    @handler("GetResourcePolicy", expand=False)
1✔
240
    def get_resource_policy(
1✔
241
        self, context: RequestContext, request: GetResourcePolicyRequest
242
    ) -> GetResourcePolicyResponse:
243
        secret_id = request["SecretId"]
1✔
244
        self._raise_if_invalid_secret_id(secret_id)
1✔
245
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
246
        policy = backend.get_resource_policy(secret_id)
1✔
247
        return GetResourcePolicyResponse(**json.loads(policy))
1✔
248

249
    @handler("GetSecretValue", expand=False)
1✔
250
    def get_secret_value(
1✔
251
        self, context: RequestContext, request: GetSecretValueRequest
252
    ) -> GetSecretValueResponse:
253
        secret_id = request.get("SecretId")
1✔
254
        version_id = request.get("VersionId")
1✔
255
        version_stage = request.get("VersionStage")
1✔
256
        if not version_id and not version_stage:
1✔
257
            version_stage = "AWSCURRENT"
1✔
258
        self._raise_if_invalid_secret_id(secret_id)
1✔
259
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
260
        self._raise_if_default_kms_key(secret_id, context, backend)
1✔
261
        try:
1✔
262
            response = backend.get_secret_value(secret_id, version_id, version_stage)
1✔
263
            response = decode_secret_binary_from_response(response)
1✔
264
        except moto_exception.SecretNotFoundException:
1✔
265
            raise ResourceNotFoundException(
1✔
266
                f"Secrets Manager can't find the specified secret value for staging label: {version_stage}"
267
            )
268
        except moto_exception.ResourceNotFoundException:
1✔
269
            error_message = (
1✔
270
                f"VersionId: {version_id}" if version_id else f"staging label: {version_stage}"
271
            )
272
            raise ResourceNotFoundException(
1✔
273
                f"Secrets Manager can't find the specified secret value for {error_message}"
274
            )
275
        except moto_exception.SecretStageVersionMismatchException:
1✔
276
            raise InvalidRequestException(
1✔
277
                "You provided a VersionStage that is not associated to the provided VersionId."
278
            )
279
        except moto_exception.SecretHasNoValueException:
1✔
280
            raise ResourceNotFoundException(
1✔
281
                f"Secrets Manager can't find the specified secret value for staging label: {version_stage}"
282
            )
283
        except moto_exception.InvalidRequestException:
×
284
            raise InvalidRequestException(
×
285
                "You can't perform this operation on the secret because it was marked for deletion."
286
            )
287
        return GetSecretValueResponse(**response)
1✔
288

289
    @handler("ListSecretVersionIds", expand=False)
1✔
290
    def list_secret_version_ids(
1✔
291
        self, context: RequestContext, request: ListSecretVersionIdsRequest
292
    ) -> ListSecretVersionIdsResponse:
293
        secret_id = request["SecretId"]
1✔
294
        include_deprecated = request.get("IncludeDeprecated", False)
1✔
295
        self._raise_if_invalid_secret_id(secret_id)
1✔
296
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
297
        secrets = backend.list_secret_version_ids(secret_id, include_deprecated=include_deprecated)
1✔
298
        return ListSecretVersionIdsResponse(**json.loads(secrets))
1✔
299

300
    @handler("PutResourcePolicy", expand=False)
1✔
301
    def put_resource_policy(
1✔
302
        self, context: RequestContext, request: PutResourcePolicyRequest
303
    ) -> PutResourcePolicyResponse:
304
        secret_id = request["SecretId"]
1✔
305
        self._raise_if_invalid_secret_id(secret_id)
1✔
306
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
307
        arn, name = backend.put_resource_policy(secret_id, request["ResourcePolicy"])
1✔
308
        return PutResourcePolicyResponse(ARN=arn, Name=name)
1✔
309

310
    @handler("PutSecretValue", expand=False)
1✔
311
    def put_secret_value(
1✔
312
        self, context: RequestContext, request: PutSecretValueRequest
313
    ) -> PutSecretValueResponse:
314
        secret_id = request["SecretId"]
1✔
315
        self._raise_if_invalid_secret_id(secret_id)
1✔
316
        self._raise_if_missing_client_req_token(request)
1✔
317
        client_req_token = request.get("ClientRequestToken")
1✔
318
        secret_string = request.get("SecretString")
1✔
319
        secret_binary = request.get("SecretBinary")
1✔
320
        if not secret_binary and not secret_string:
1✔
321
            raise InvalidRequestException("You must provide either SecretString or SecretBinary.")
×
322

323
        version_stages = request.get("VersionStages", ["AWSCURRENT"])
1✔
324
        if not isinstance(version_stages, list):
1✔
325
            version_stages = [version_stages]
×
326

327
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
328
        self._raise_if_default_kms_key(secret_id, context, backend)
1✔
329

330
        response = backend.put_secret_value(
1✔
331
            secret_id=secret_id,
332
            secret_binary=secret_binary,
333
            secret_string=secret_string,
334
            version_stages=version_stages,
335
            client_request_token=client_req_token,
336
        )
337
        return PutSecretValueResponse(**json.loads(response))
1✔
338

339
    @handler("RemoveRegionsFromReplication", expand=False)
1✔
340
    def remove_regions_from_replication(
1✔
341
        self, context: RequestContext, request: RemoveRegionsFromReplicationRequest
342
    ) -> RemoveRegionsFromReplicationResponse:
343
        self._raise_if_invalid_secret_id(request["SecretId"])
×
344
        return call_moto(context, request)
×
345

346
    @handler("ReplicateSecretToRegions", expand=False)
1✔
347
    def replicate_secret_to_regions(
1✔
348
        self, context: RequestContext, request: ReplicateSecretToRegionsRequest
349
    ) -> ReplicateSecretToRegionsResponse:
350
        self._raise_if_invalid_secret_id(request["SecretId"])
×
351
        return call_moto(context, request)
×
352

353
    @handler("RestoreSecret", expand=False)
1✔
354
    def restore_secret(
1✔
355
        self, context: RequestContext, request: RestoreSecretRequest
356
    ) -> RestoreSecretResponse:
357
        secret_id = request["SecretId"]
1✔
358
        self._raise_if_invalid_secret_id(secret_id)
1✔
359
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
360
        try:
1✔
361
            arn, name = backend.restore_secret(secret_id)
1✔
362
        except moto_exception.SecretNotFoundException:
×
363
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
×
364
        return RestoreSecretResponse(ARN=arn, Name=name)
1✔
365

366
    @handler("RotateSecret", expand=False)
1✔
367
    def rotate_secret(
1✔
368
        self, context: RequestContext, request: RotateSecretRequest
369
    ) -> RotateSecretResponse:
370
        self._raise_if_missing_client_req_token(request)
1✔
371
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
372
        return call_moto(context, request)
1✔
373

374
    @handler("StopReplicationToReplica", expand=False)
1✔
375
    def stop_replication_to_replica(
1✔
376
        self, context: RequestContext, request: StopReplicationToReplicaRequest
377
    ) -> StopReplicationToReplicaResponse:
378
        self._raise_if_invalid_secret_id(request["SecretId"])
×
379
        return call_moto(context, request)
×
380

381
    @handler("TagResource", expand=False)
1✔
382
    def tag_resource(self, context: RequestContext, request: TagResourceRequest) -> None:
1✔
383
        secret_id = request["SecretId"]
1✔
384
        tags = request["Tags"]
1✔
385
        self._raise_if_invalid_secret_id(secret_id)
1✔
386
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
387
        backend.tag_resource(secret_id, tags)
1✔
388

389
    @handler("UntagResource", expand=False)
1✔
390
    def untag_resource(self, context: RequestContext, request: UntagResourceRequest) -> None:
1✔
391
        secret_id = request["SecretId"]
1✔
392
        tag_keys = request.get("TagKeys")
1✔
393
        self._raise_if_invalid_secret_id(secret_id)
1✔
394
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
395
        backend.untag_resource(secret_id=secret_id, tag_keys=tag_keys)
1✔
396

397
    @handler("UpdateSecret", expand=False)
1✔
398
    def update_secret(
1✔
399
        self, context: RequestContext, request: UpdateSecretRequest
400
    ) -> UpdateSecretResponse:
401
        # if we're modifying the value of the secret, ClientRequestToken is required
402
        secret_id = request["SecretId"]
1✔
403
        secret_string = request.get("SecretString")
1✔
404
        secret_binary = request.get("SecretBinary")
1✔
405
        description = request.get("Description")
1✔
406
        kms_key_id = request.get("KmsKeyId")
1✔
407
        client_req_token = request.get("ClientRequestToken")
1✔
408
        self._raise_if_invalid_secret_id(secret_id)
1✔
409
        self._raise_if_missing_client_req_token(request)
1✔
410

411
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
412
        try:
1✔
413
            secret = backend.update_secret(
1✔
414
                secret_id,
415
                description=description,
416
                secret_string=secret_string,
417
                secret_binary=secret_binary,
418
                client_request_token=client_req_token,
419
                kms_key_id=kms_key_id,
420
            )
421
        except moto_exception.SecretNotFoundException:
×
422
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
×
423
        except moto_exception.OperationNotPermittedOnReplica:
×
424
            raise InvalidRequestException(
×
425
                "Operation not permitted on a replica secret. Call must be made in primary secret's region."
426
            )
427
        except moto_exception.InvalidRequestException:
×
428
            raise InvalidRequestException(
×
429
                "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
430
                "You can't perform this operation on the secret because it was marked for deletion."
431
            )
432
        return UpdateSecretResponse(**json.loads(secret))
1✔
433

434
    @handler("UpdateSecretVersionStage", expand=False)
1✔
435
    def update_secret_version_stage(
1✔
436
        self, context: RequestContext, request: UpdateSecretVersionStageRequest
437
    ) -> UpdateSecretVersionStageResponse:
438
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
439
        return call_moto(context, request)
1✔
440

441
    @handler("ValidateResourcePolicy", expand=False)
1✔
442
    def validate_resource_policy(
1✔
443
        self, context: RequestContext, request: ValidateResourcePolicyRequest
444
    ) -> ValidateResourcePolicyResponse:
445
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
446
        return call_moto(context, request)
×
447

448

449
@patch(FakeSecret.__init__)
1✔
450
def fake_secret__init__(fn, self, *args, **kwargs):
1✔
451
    fn(self, *args, **kwargs)
1✔
452

453
    # Fix time not including millis.
454
    time_now = time.time()
1✔
455
    if kwargs.get("last_changed_date", None):
1✔
456
        self.last_changed_date = time_now
1✔
457
    if kwargs.get("created_date", None):
1✔
458
        self.created_date = time_now
1✔
459

460
    # The last date that the secret value was retrieved.
461
    # This value does not include the time.
462
    # This field is omitted if the secret has never been retrieved.
463
    self.last_accessed_date = None
1✔
464
    # Results in RotationEnabled being returned only if rotation was ever overwritten,
465
    # in which case this field is non-null, but an integer.
466
    self.auto_rotate_after_days = None
1✔
467
    self.rotation_lambda_arn = None
1✔
468

469

470
@patch(FakeSecret.update)
1✔
471
def fake_secret_update(
1✔
472
    fn, self, description=None, tags=None, kms_key_id=None, last_changed_date=None
473
):
474
    fn(self, description, tags, kms_key_id, last_changed_date)
1✔
475
    if last_changed_date is not None:
1✔
476
        self.last_changed_date = round(time.time(), 3)
1✔
477

478

479
@patch(SecretsManagerBackend.get_secret_value)
1✔
480
def moto_smb_get_secret_value(fn, self, secret_id, version_id, version_stage):
1✔
481
    res = fn(self, secret_id, version_id, version_stage)
1✔
482

483
    secret = self.secrets[secret_id]
1✔
484

485
    # Patch: update last accessed date on get.
486
    secret.last_accessed_date = today_no_time()
1✔
487

488
    # Patch: update version's last accessed date.
489
    secret_version = secret.versions.get(version_id or secret.default_version_id)
1✔
490
    if secret_version:
1✔
491
        secret_version["last_accessed_date"] = secret.last_accessed_date
1✔
492

493
    return res
1✔
494

495

496
@patch(SecretsManagerBackend.create_secret)
1✔
497
def moto_smb_create_secret(fn, self, name, *args, **kwargs):
1✔
498
    # Creating a secret with a SecretId equal to one that is scheduled for
499
    # deletion should raise an 'InvalidRequestException'.
500
    secret: FakeSecret | None = self.secrets.get(name)
1✔
501
    if secret is not None and secret.deleted_date is not None:
1✔
502
        raise InvalidRequestException(AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION)
1✔
503

504
    if name in self.secrets:
1✔
505
        raise ResourceExistsException(
1✔
506
            f"The operation failed because the secret {name} already exists."
507
        )
508

509
    return fn(self, name, *args, **kwargs)
1✔
510

511

512
@patch(SecretsManagerBackend.list_secret_version_ids)
1✔
513
def moto_smb_list_secret_version_ids(
1✔
514
    _, self, secret_id: str, include_deprecated: bool, *args, **kwargs
515
):
516
    if secret_id not in self.secrets:
1✔
517
        raise SecretNotFoundException()
1✔
518

519
    if self.secrets[secret_id].is_deleted():
1✔
520
        raise InvalidRequestException(
×
521
            "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
522
            "You can't perform this operation on the secret because it was marked for deletion."
523
        )
524

525
    secret = self.secrets[secret_id]
1✔
526

527
    # Patch: output format, report exact createdate instead of current time.
528
    versions: list[SecretVersionsListEntry] = []
1✔
529
    for version_id, version in secret.versions.items():
1✔
530
        version_stages = version["version_stages"]
1✔
531
        # Patch: include deprecated versions if include_deprecated is True.
532
        # version_stages is empty if the version is deprecated.
533
        # see: https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html#term_version
534
        if len(version_stages) > 0 or include_deprecated:
1✔
535
            entry = SecretVersionsListEntry(
1✔
536
                CreatedDate=version["createdate"],
537
                VersionId=version_id,
538
            )
539

540
            if version_stages:
1✔
541
                entry["VersionStages"] = version_stages
1✔
542

543
            # Patch: bind LastAccessedDate if one exists for this version.
544
            last_accessed_date = version.get("last_accessed_date")
1✔
545
            if last_accessed_date:
1✔
546
                entry["LastAccessedDate"] = last_accessed_date
1✔
547

548
            versions.append(entry)
1✔
549

550
    # Patch: sort versions by date.
551
    versions.sort(key=lambda v: v["CreatedDate"], reverse=True)
1✔
552

553
    response = ListSecretVersionIdsResponse(ARN=secret.arn, Name=secret.name, Versions=versions)
1✔
554

555
    return json.dumps(response)
1✔
556

557

558
@patch(FakeSecret.to_dict)
1✔
559
def fake_secret_to_dict(fn, self):
1✔
560
    res_dict = fn(self)
1✔
561
    if self.last_accessed_date:
1✔
562
        res_dict["LastAccessedDate"] = self.last_accessed_date
1✔
563
    if not self.description and "Description" in res_dict:
1✔
564
        del res_dict["Description"]
×
565
    if not self.rotation_enabled and "RotationEnabled" in res_dict:
1✔
566
        del res_dict["RotationEnabled"]
×
567
    if self.auto_rotate_after_days is None and "RotationRules" in res_dict:
1✔
568
        del res_dict["RotationRules"]
×
569
    if self.tags is None and "Tags" in res_dict:
1✔
570
        del res_dict["Tags"]
×
571
    for null_field in [key for key, value in res_dict.items() if value is None]:
1✔
572
        del res_dict[null_field]
1✔
573
    return res_dict
1✔
574

575

576
@patch(SecretsManagerBackend.update_secret)
1✔
577
def backend_update_secret(
1✔
578
    fn,
579
    self,
580
    secret_id,
581
    description=None,
582
    **kwargs,
583
):
584
    if secret_id not in self.secrets:
1✔
585
        raise SecretNotFoundException()
×
586

587
    if self.secrets[secret_id].is_deleted():
1✔
588
        raise InvalidRequestException(
×
589
            "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
590
            "You can't perform this operation on the secret because it was marked for deletion."
591
        )
592

593
    secret = self.secrets[secret_id]
1✔
594
    version_id_t0 = secret.default_version_id
1✔
595

596
    requires_new_version: bool = any(
1✔
597
        [kwargs.get("kms_key_id"), kwargs.get("secret_binary"), kwargs.get("secret_string")]
598
    )
599
    if requires_new_version:
1✔
600
        fn(self, secret_id, **kwargs)
1✔
601

602
    if description is not None:
1✔
603
        secret.description = description
1✔
604

605
    version_id_t1 = secret.default_version_id
1✔
606

607
    resp: UpdateSecretResponse = UpdateSecretResponse()
1✔
608
    resp["ARN"] = secret.arn
1✔
609
    resp["Name"] = secret.name
1✔
610

611
    if version_id_t0 != version_id_t1:
1✔
612
        resp["VersionId"] = version_id_t1
1✔
613

614
    return json.dumps(resp)
1✔
615

616

617
@patch(SecretsManagerBackend.tag_resource)
1✔
618
def backend_tag_resource(fn, self, secret_id, tags):
1✔
619
    if secret_id not in self.secrets:
1✔
620
        raise SecretNotFoundException()
×
621

622
    if self.secrets[secret_id].is_deleted():
1✔
623
        raise InvalidRequestException(
1✔
624
            "You can't perform this operation on the secret because it was marked for deletion."
625
        )
626

627
    return fn(self, secret_id, tags)
1✔
628

629

630
@patch(SecretsManagerBackend.untag_resource)
1✔
631
def backend_untag_resource(fn, self, secret_id, tag_keys):
1✔
632
    if secret_id not in self.secrets:
1✔
UNCOV
633
        raise SecretNotFoundException()
×
634

635
    if self.secrets[secret_id].is_deleted():
1✔
636
        raise InvalidRequestException(
1✔
637
            "You can't perform this operation on the secret because it was marked for deletion."
638
        )
639

640
    return fn(self, secret_id, tag_keys)
1✔
641

642

643
@patch(SecretsManagerResponse.update_secret, pass_target=False)
1✔
644
def response_update_secret(self):
1✔
UNCOV
645
    secret_id = self._get_param("SecretId")
×
UNCOV
646
    description = self._get_param("Description")
×
UNCOV
647
    secret_string = self._get_param("SecretString")
×
UNCOV
648
    secret_binary = self._get_param("SecretBinary")
×
UNCOV
649
    client_request_token = self._get_param("ClientRequestToken")
×
UNCOV
650
    kms_key_id = self._get_param("KmsKeyId")
×
UNCOV
651
    return self.backend.update_secret(
×
652
        secret_id=secret_id,
653
        description=description,
654
        secret_string=secret_string,
655
        secret_binary=secret_binary,
656
        client_request_token=client_request_token,
657
        kms_key_id=kms_key_id,
658
    )
659

660

661
@patch(SecretsManagerBackend.update_secret_version_stage)
1✔
662
def backend_update_secret_version_stage(
1✔
663
    fn, self, secret_id, version_stage, remove_from_version_id, move_to_version_id
664
):
665
    fn(self, secret_id, version_stage, remove_from_version_id, move_to_version_id)
1✔
666

667
    secret = self.secrets[secret_id]
1✔
668

669
    # Patch: default version is the new AWSCURRENT version
670
    if version_stage == AWSCURRENT:
1✔
671
        secret.default_version_id = move_to_version_id
1✔
672

673
    versions_no_stages = []
1✔
674
    for version_id, version in secret.versions.items():
1✔
675
        version_stages = version["version_stages"]
1✔
676

677
        # moto appends a new AWSPREVIOUS label to the version AWSCURRENT was removed from,
678
        # but it does not remove the old AWSPREVIOUS label.
679
        # Patch: ensure only one AWSPREVIOUS tagged version is in the pool.
680
        if (
1✔
681
            version_stage == AWSCURRENT
682
            and version_id != remove_from_version_id
683
            and AWSPREVIOUS in version_stages
684
        ):
UNCOV
685
            version_stages.remove(AWSPREVIOUS)
×
686

687
        if not version_stages:
1✔
688
            versions_no_stages.append(version_id)
1✔
689

690
    # Patch: remove secret versions with no version stages.
691
    for version_no_stages in versions_no_stages:
1✔
692
        del secret.versions[version_no_stages]
1✔
693

694
    return secret.arn, secret.name
1✔
695

696

697
@patch(FakeSecret.reset_default_version)
1✔
698
def fake_secret_reset_default_version(fn, self, secret_version, version_id):
1✔
699
    fn(self, secret_version, version_id)
1✔
700

701
    # Remove versions with no version stages, if max limit of outdated versions is exceeded.
702
    versions_no_stages: list[str] = [
1✔
703
        version_id for version_id, version in self.versions.items() if not version["version_stages"]
704
    ]
705
    versions_to_delete: list[str] = []
1✔
706

707
    # Patch: remove outdated versions if the max deprecated versions limit is exceeded.
708
    if len(versions_no_stages) >= MAX_OUTDATED_SECRET_VERSIONS:
1✔
709
        versions_to_delete = versions_no_stages[
1✔
710
            : len(versions_no_stages) - MAX_OUTDATED_SECRET_VERSIONS
711
        ]
712

713
    for version_to_delete in versions_to_delete:
1✔
714
        del self.versions[version_to_delete]
1✔
715

716

717
@patch(FakeSecret.remove_version_stages_from_old_versions)
1✔
718
def fake_secret_remove_version_stages_from_old_versions(fn, self, version_stages):
1✔
719
    fn(self, version_stages)
1✔
720
    # Remove versions with no version stages.
721
    versions_no_stages = [
1✔
722
        version_id for version_id, version in self.versions.items() if not version["version_stages"]
723
    ]
724
    for version_no_stages in versions_no_stages:
1✔
725
        del self.versions[version_no_stages]
1✔
726

727

728
# Moto does not support rotate_immediately as an API parameter while the AWS API does
729
@patch(SecretsManagerResponse.rotate_secret, pass_target=False)
1✔
730
def rotate_secret(self) -> str:
1✔
731
    client_request_token = self._get_param("ClientRequestToken")
1✔
732
    rotation_lambda_arn = self._get_param("RotationLambdaARN")
1✔
733
    rotation_rules = self._get_param("RotationRules")
1✔
734
    rotate_immediately = self._get_param("RotateImmediately")
1✔
735
    secret_id = self._get_param("SecretId")
1✔
736
    return self.backend.rotate_secret(
1✔
737
        secret_id=secret_id,
738
        client_request_token=client_request_token,
739
        rotation_lambda_arn=rotation_lambda_arn,
740
        rotation_rules=rotation_rules,
741
        rotate_immediately=True if rotate_immediately is None else rotate_immediately,
742
    )
743

744

745
@patch(SecretsManagerBackend.rotate_secret)
1✔
746
def backend_rotate_secret(
1✔
747
    _,
748
    self,
749
    secret_id,
750
    client_request_token=None,
751
    rotation_lambda_arn=None,
752
    rotation_rules=None,
753
    rotate_immediately=True,
754
):
755
    rotation_days = "AutomaticallyAfterDays"
1✔
756

757
    if not self._is_valid_identifier(secret_id):
1✔
UNCOV
758
        raise SecretNotFoundException()
×
759

760
    secret = self.secrets[secret_id]
1✔
761
    if secret.is_deleted():
1✔
UNCOV
762
        raise InvalidRequestException(
×
763
            "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
764
            perform the operation on a secret that's currently marked deleted."
765
        )
766
    # Resolve rotation_lambda_arn and fallback to previous value if its missing
767
    # from the current request
768
    rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn
1✔
769
    if not rotation_lambda_arn:
1✔
770
        raise InvalidRequestException(
1✔
771
            "No Lambda rotation function ARN is associated with this secret."
772
        )
773

774
    if rotation_lambda_arn:
1✔
775
        if len(rotation_lambda_arn) > 2048:
1✔
UNCOV
776
            msg = "RotationLambdaARN must <= 2048 characters long."
×
UNCOV
777
            raise InvalidParameterException(msg)
×
778

779
    # In case rotation_period is not provided, resolve auto_rotate_after_days
780
    # and fallback to previous value if its missing from the current request.
781
    rotation_period = secret.auto_rotate_after_days or 0
1✔
782
    if rotation_rules:
1✔
783
        if rotation_days in rotation_rules:
1✔
784
            rotation_period = rotation_rules[rotation_days]
1✔
785
            if rotation_period < 1 or rotation_period > 1000:
1✔
786
                msg = "RotationRules.AutomaticallyAfterDays must be within 1-1000."
×
787
                raise InvalidParameterException(msg)
×
788

789
    try:
1✔
790
        lm_client = connect_to(region_name=self.region_name).lambda_
1✔
791
        lm_client.get_function(FunctionName=rotation_lambda_arn)
1✔
792
    except Exception:
1✔
793
        raise ResourceNotFoundException("Lambda does not exist or could not be accessed")
1✔
794

795
    # The rotation function must end with the versions of the secret in
796
    # one of two states:
797
    #
798
    #  - The AWSPENDING and AWSCURRENT staging labels are attached to the
799
    #    same version of the secret, or
800
    #  - The AWSPENDING staging label is not attached to any version of the secret.
801
    #
802
    # If the AWSPENDING staging label is present but not attached to the same
803
    # version as AWSCURRENT then any later invocation of RotateSecret assumes
804
    # that a previous rotation request is still in progress and returns an error.
805
    try:
1✔
806
        pending_version = None
1✔
807
        version = next(
1✔
808
            version
809
            for version in secret.versions.values()
810
            if AWSPENDING in version["version_stages"]
811
        )
UNCOV
812
        if AWSCURRENT not in version["version_stages"]:
×
UNCOV
813
            msg = "Previous rotation request is still in progress."
×
814
            # Delay exception, so we can trigger lambda again
UNCOV
815
            pending_version = [InvalidRequestException(msg), version]
×
816

817
    except StopIteration:
1✔
818
        # Pending is not present in any version
819
        pass
1✔
820

821
    secret.rotation_lambda_arn = rotation_lambda_arn
1✔
822
    secret.auto_rotate_after_days = rotation_period
1✔
823
    if secret.auto_rotate_after_days > 0:
1✔
824
        wait_interval_s = int(rotation_period) * 86400
1✔
825
        secret.next_rotation_date = int(time.time()) + wait_interval_s
1✔
826
        secret.rotation_enabled = True
1✔
827
        secret.rotation_requested = True
1✔
828

829
    if rotate_immediately:
1✔
830
        if not pending_version:
1✔
831
            # Begin the rotation process for the given secret by invoking the lambda function.
832
            #
833
            # We add the new secret version as "pending". The previous version remains
834
            # as "current" for now. Once we've passed the new secret through the lambda
835
            # rotation function (if provided) we can then update the status to "current".
836
            new_version_id = self._from_client_request_token(client_request_token)
1✔
837

838
            # An initial dummy secret value is necessary otherwise moto is not adding the new
839
            # secret version.
840
            self._add_secret(
1✔
841
                secret_id,
842
                "dummy_password",
843
                description=secret.description,
844
                tags=secret.tags,
845
                version_id=new_version_id,
846
                version_stages=[AWSPENDING],
847
            )
848

849
            # AWS secret rotation function templates have checks on existing values so we remove
850
            # the dummy value to force the lambda to generate a new one.
851
            del secret.versions[new_version_id]["secret_string"]
1✔
852
        else:
UNCOV
853
            new_version_id = pending_version.pop()["version_id"]
×
854

855
        try:
1✔
856
            for step in ["create", "set", "test", "finish"]:
1✔
857
                resp = lm_client.invoke(
1✔
858
                    FunctionName=rotation_lambda_arn,
859
                    Payload=json.dumps(
860
                        {
861
                            "Step": step + "Secret",
862
                            "SecretId": secret.name,
863
                            "ClientRequestToken": new_version_id,
864
                        }
865
                    ),
866
                )
867
                if resp.get("FunctionError"):
1✔
UNCOV
868
                    data = json.loads(resp.get("Payload").read())
×
UNCOV
869
                    raise Exception(data.get("errorType"))
×
UNCOV
870
        except Exception as e:
×
UNCOV
871
            LOG.debug("An exception (%s) has occurred in %s", str(e), rotation_lambda_arn)
×
UNCOV
872
            if pending_version:
×
UNCOV
873
                raise pending_version.pop()
×
874
            # Fall through if there is no previously pending version so we'll "stuck" with a new
875
            # secret version in AWSPENDING state.
876
    secret.last_rotation_date = int(time.time())
1✔
877
    return secret.to_short_dict(version_id=new_version_id)
1✔
878

879

880
@patch(moto_exception.SecretNotFoundException.__init__)
1✔
881
def moto_secret_not_found_exception_init(fn, self):
1✔
882
    fn(self)
1✔
883
    self.code = 400
1✔
884

885

886
@patch(FakeSecret._form_version_ids_to_stages, pass_target=False)
1✔
887
def _form_version_ids_to_stages_modal(self):
1✔
888
    version_id_to_stages: dict[str, list] = {}
1✔
889
    for key, value in self.versions.items():
1✔
890
        # Patch: include version_stages in the response only if it is not empty.
891
        if len(value["version_stages"]) > 0:
1✔
892
            version_id_to_stages[key] = value["version_stages"]
1✔
893
    return version_id_to_stages
1✔
894

895

896
# patching resource policy in moto
897
def get_resource_policy_model(self, secret_id):
1✔
898
    if self._is_valid_identifier(secret_id):
1✔
899
        result = {
1✔
900
            "ARN": self.secrets[secret_id].arn,
901
            "Name": self.secrets[secret_id].secret_id,
902
        }
903
        policy = getattr(self.secrets[secret_id], "policy", None)
1✔
904
        if policy:
1✔
905
            result["ResourcePolicy"] = policy
1✔
906
        return json.dumps(result)
1✔
907
    else:
UNCOV
908
        raise SecretNotFoundException()
×
909

910

911
def get_resource_policy_response(self):
1✔
912
    secret_id = self._get_param("SecretId")
×
UNCOV
913
    return self.backend.get_resource_policy(secret_id=secret_id)
×
914

915

916
def decode_secret_binary_from_response(response: dict[str, Any]):
1✔
917
    if "SecretBinary" in response:
1✔
918
        response["SecretBinary"] = base64.b64decode(response["SecretBinary"])
1✔
919

920
    return response
1✔
921

922

923
def delete_resource_policy_model(self, secret_id):
1✔
UNCOV
924
    if self._is_valid_identifier(secret_id):
×
UNCOV
925
        self.secrets[secret_id].policy = None
×
UNCOV
926
        return json.dumps(
×
927
            {
928
                "ARN": self.secrets[secret_id].arn,
929
                "Name": self.secrets[secret_id].secret_id,
930
            }
931
        )
932
    else:
933
        raise SecretNotFoundException()
×
934

935

936
def delete_resource_policy_response(self):
1✔
UNCOV
937
    secret_id = self._get_param("SecretId")
×
UNCOV
938
    return self.backend.delete_resource_policy(secret_id=secret_id)
×
939

940

941
def put_resource_policy_model(self, secret_id, resource_policy):
1✔
UNCOV
942
    policy_validator = IAMPolicyDocumentValidator(resource_policy)
×
UNCOV
943
    policy_validator._validate_top_elements()
×
944
    policy_validator._validate_version_syntax()
×
UNCOV
945
    if self._is_valid_identifier(secret_id):
×
946
        self.secrets[secret_id].policy = resource_policy
×
UNCOV
947
        return json.dumps(
×
948
            {
949
                "ARN": self.secrets[secret_id].arn,
950
                "Name": self.secrets[secret_id].secret_id,
951
            }
952
        )
953
    else:
UNCOV
954
        raise SecretNotFoundException()
×
955

956

957
def put_resource_policy_response(self):
1✔
UNCOV
958
    secret_id = self._get_param("SecretId")
×
UNCOV
959
    resource_policy = self._get_param("ResourcePolicy")
×
UNCOV
960
    return self.backend.put_resource_policy(
×
961
        secret_id=secret_id, resource_policy=json.loads(resource_policy)
962
    )
963

964

965
def apply_patches():
1✔
966
    SecretsManagerBackend.get_resource_policy = get_resource_policy_model
1✔
967
    SecretsManagerResponse.get_resource_policy = get_resource_policy_response
1✔
968

969
    if not hasattr(SecretsManagerBackend, "delete_resource_policy"):
1✔
UNCOV
970
        SecretsManagerBackend.delete_resource_policy = delete_resource_policy_model
×
971
    if not hasattr(SecretsManagerResponse, "delete_resource_policy"):
1✔
UNCOV
972
        SecretsManagerResponse.delete_resource_policy = delete_resource_policy_response
×
973
    if not hasattr(SecretsManagerBackend, "put_resource_policy"):
1✔
UNCOV
974
        SecretsManagerBackend.put_resource_policy = put_resource_policy_model
×
975
    if not hasattr(SecretsManagerResponse, "put_resource_policy"):
1✔
UNCOV
976
        SecretsManagerResponse.put_resource_policy = put_resource_policy_response
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc