• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21891233725

10 Feb 2026 10:59PM UTC coverage: 86.883% (+0.01%) from 86.871%
21891233725

push

github

web-flow
SFN: Fix Local mock iterations for states without retry (#13693)

Co-authored-by: Hernan <hernaner28@gmail.com>

The Step Functions Local mock configuration was using RetryCount to index numbered mock responses. In case of successful invocations or states without retry configuration this caused all invocations to return the same response ("0") instead of iterating through the sequence ("0", "1", "2", etc.).

RetryCount only increments on actual retry attempts (failures), not on successful invocations. This made the mock iteration feature unusable for testing state transition scenarios with multiple invocations of the same state.

- Added next_local_mock_invocation_number dict to execution environment to track mock invocations.
- Modified get_current_local_mocked_response() to use next_local_mock_invocation_number instead of retry_count.
- Shared next_local_mock_invocation_number between parent and child frames to maintain consistent counting across execution context

An existing `aws.services.stepfunctions.v2.local_mocking.test_base_scenarios.TestBaseScenarios.test_map_state_lambda test` was giving a false positive. It used a mock response with only one mocked invocation and should have failed when 2 invocations were done. Effectively, it started to fail after the fix and mocked response has been adjusted.

Also, adds `test_numbered_mock_responses_multiple_success_invocations` that tests multiple success invocations in a regular top-level state, outside of map configuration.

7 of 7 new or added lines in 1 file covered. (100.0%)

255 existing lines in 12 files now uncovered.

69977 of 80542 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.66
/localstack-core/localstack/services/secretsmanager/provider.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
import time
1✔
8
from typing import Any, Final
1✔
9

10
import moto.secretsmanager.exceptions as moto_exception
1✔
11
from botocore.utils import InvalidArnException
1✔
12
from moto.iam.policy_validation import IAMPolicyDocumentValidator
1✔
13
from moto.secretsmanager import secretsmanager_backends
1✔
14
from moto.secretsmanager.models import FakeSecret, SecretsManagerBackend
1✔
15
from moto.secretsmanager.responses import SecretsManagerResponse
1✔
16

17
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
18
from localstack.aws.api.secretsmanager import (
1✔
19
    CancelRotateSecretRequest,
20
    CancelRotateSecretResponse,
21
    CreateSecretRequest,
22
    CreateSecretResponse,
23
    DeleteResourcePolicyRequest,
24
    DeleteResourcePolicyResponse,
25
    DeleteSecretRequest,
26
    DeleteSecretResponse,
27
    DescribeSecretRequest,
28
    DescribeSecretResponse,
29
    GetResourcePolicyRequest,
30
    GetResourcePolicyResponse,
31
    GetSecretValueRequest,
32
    GetSecretValueResponse,
33
    InvalidParameterException,
34
    InvalidRequestException,
35
    ListSecretVersionIdsRequest,
36
    ListSecretVersionIdsResponse,
37
    NameType,
38
    PutResourcePolicyRequest,
39
    PutResourcePolicyResponse,
40
    PutSecretValueRequest,
41
    PutSecretValueResponse,
42
    RemoveRegionsFromReplicationRequest,
43
    RemoveRegionsFromReplicationResponse,
44
    ReplicateSecretToRegionsRequest,
45
    ReplicateSecretToRegionsResponse,
46
    ResourceExistsException,
47
    ResourceNotFoundException,
48
    RestoreSecretRequest,
49
    RestoreSecretResponse,
50
    RotateSecretRequest,
51
    RotateSecretResponse,
52
    SecretIdType,
53
    SecretsmanagerApi,
54
    SecretVersionsListEntry,
55
    StopReplicationToReplicaRequest,
56
    StopReplicationToReplicaResponse,
57
    TagResourceRequest,
58
    UntagResourceRequest,
59
    UpdateSecretRequest,
60
    UpdateSecretResponse,
61
    UpdateSecretVersionStageRequest,
62
    UpdateSecretVersionStageResponse,
63
    ValidateResourcePolicyRequest,
64
    ValidateResourcePolicyResponse,
65
)
66
from localstack.aws.connect import connect_to
1✔
67
from localstack.services.moto import call_moto
1✔
68
from localstack.state import StateVisitor
1✔
69
from localstack.utils.aws import arns
1✔
70
from localstack.utils.patch import patch
1✔
71
from localstack.utils.time import today_no_time
1✔
72

73
# Constants.
74
AWSPREVIOUS: Final[str] = "AWSPREVIOUS"
1✔
75
AWSPENDING: Final[str] = "AWSPENDING"
1✔
76
AWSCURRENT: Final[str] = "AWSCURRENT"
1✔
77
# The maximum number of outdated versions that can be stored in the secret.
78
# see: https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_PutSecretValue.html
79
MAX_OUTDATED_SECRET_VERSIONS: Final[int] = 100
1✔
80
#
81
# Error Messages.
82
AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION: Final[str] = (
1✔
83
    "You can't create this secret because a secret with this name is already scheduled for deletion."
84
)
85

86
LOG = logging.getLogger(__name__)
1✔
87

88

89
class ValidationException(CommonServiceException):
1✔
90
    def __init__(self, message: str):
1✔
91
        super().__init__("ValidationException", message, 400, True)
1✔
92

93

94
class SecretNotFoundException(CommonServiceException):
1✔
95
    def __init__(self):
1✔
96
        super().__init__(
1✔
97
            "ResourceNotFoundException",
98
            "Secrets Manager can't find the specified secret.",
99
            400,
100
            True,
101
        )
102

103

104
class SecretsmanagerProvider(SecretsmanagerApi):
1✔
105
    def __init__(self):
1✔
106
        super().__init__()
1✔
107
        apply_patches()
1✔
108

109
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
110
        visitor.visit(secretsmanager_backends)
×
111

112
    @staticmethod
1✔
113
    def get_moto_backend_for_resource(
1✔
114
        name_or_arn: str, context: RequestContext
115
    ) -> SecretsManagerBackend:
116
        try:
1✔
117
            arn_data = arns.parse_arn(name_or_arn)
1✔
118
            backend = secretsmanager_backends[arn_data["account"]][arn_data["region"]]
1✔
119
        except InvalidArnException:
1✔
120
            backend = secretsmanager_backends[context.account_id][context.region]
1✔
121
        return backend
1✔
122

123
    @staticmethod
1✔
124
    def _raise_if_default_kms_key(
1✔
125
        secret_id: str, request: RequestContext, backend: SecretsManagerBackend
126
    ):
127
        try:
1✔
128
            secret = backend.describe_secret(secret_id)
1✔
129
        except moto_exception.SecretNotFoundException:
1✔
130
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
1✔
131
        if secret.kms_key_id is None and request.account_id != secret.account_id:
1✔
132
            raise InvalidRequestException(
1✔
133
                "You can't access a secret from a different AWS account if you encrypt the secret with the default KMS service key."
134
            )
135

136
    @staticmethod
1✔
137
    def _validate_secret_id(secret_id: SecretIdType) -> bool:
1✔
138
        # The secret name can contain ASCII letters, numbers, and the following characters: /_+=.@-
139
        return bool(re.match(r"^[A-Za-z0-9/_+=.@-]+\Z", secret_id))
1✔
140

141
    @staticmethod
1✔
142
    def _raise_if_invalid_secret_id(secret_id: SecretIdType | NameType):
1✔
143
        # Patches moto's implementation for which secret_ids are not validated, by raising a ValidationException.
144
        # Skips this check if the secret_id provided appears to be an arn (starting with 'arn:').
145
        if not re.match(
1✔
146
            r"^arn:", secret_id
147
        ):  # Check if it appears to be an arn: so to skip secret_id check: delegate parsing of arn to handlers.
148
            if not SecretsmanagerProvider._validate_secret_id(secret_id):
1✔
149
                raise ValidationException(
1✔
150
                    "Invalid name. Must be a valid name containing alphanumeric "
151
                    "characters, or any of the following: -/_+=.@!"
152
                )
153

154
    @staticmethod
1✔
155
    def _raise_if_missing_client_req_token(
1✔
156
        request: CreateSecretRequest
157
        | PutSecretValueRequest
158
        | RotateSecretRequest
159
        | UpdateSecretRequest,
160
    ):
161
        if "ClientRequestToken" not in request:
1✔
162
            raise InvalidRequestException(
1✔
163
                "You must provide a ClientRequestToken value. We recommend a UUID-type value."
164
            )
165

166
    @handler("CancelRotateSecret", expand=False)
1✔
167
    def cancel_rotate_secret(
1✔
168
        self, context: RequestContext, request: CancelRotateSecretRequest
169
    ) -> CancelRotateSecretResponse:
170
        self._raise_if_invalid_secret_id(request["SecretId"])
×
171
        return call_moto(context, request)
×
172

173
    @handler("CreateSecret", expand=False)
1✔
174
    def create_secret(
1✔
175
        self, context: RequestContext, request: CreateSecretRequest
176
    ) -> CreateSecretResponse:
177
        self._raise_if_missing_client_req_token(request)
1✔
178
        # Some providers need to create keys which are not usually creatable by users
179
        if not any(
1✔
180
            tag_entry["Key"] == "BYPASS_SECRET_ID_VALIDATION"
181
            for tag_entry in request.get("Tags", [])
182
        ):
183
            self._raise_if_invalid_secret_id(request["Name"])
1✔
184
        else:
185
            request["Tags"] = [
1✔
186
                tag_entry
187
                for tag_entry in request.get("Tags", [])
188
                if tag_entry["Key"] != "BYPASS_SECRET_ID_VALIDATION"
189
            ]
190

191
        return call_moto(context, request)
1✔
192

193
    @handler("DeleteResourcePolicy", expand=False)
1✔
194
    def delete_resource_policy(
1✔
195
        self, context: RequestContext, request: DeleteResourcePolicyRequest
196
    ) -> DeleteResourcePolicyResponse:
197
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
198
        return call_moto(context, request)
1✔
199

200
    @handler("DeleteSecret", expand=False)
1✔
201
    def delete_secret(
1✔
202
        self, context: RequestContext, request: DeleteSecretRequest
203
    ) -> DeleteSecretResponse:
204
        secret_id: str = request["SecretId"]
1✔
205
        self._raise_if_invalid_secret_id(secret_id)
1✔
206
        recovery_window_in_days: int | None = request.get("RecoveryWindowInDays")
1✔
207
        force_delete_without_recovery: bool | None = request.get("ForceDeleteWithoutRecovery")
1✔
208

209
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
210
        try:
1✔
211
            arn, name, deletion_date = backend.delete_secret(
1✔
212
                secret_id=secret_id,
213
                recovery_window_in_days=recovery_window_in_days,
214
                force_delete_without_recovery=force_delete_without_recovery,
215
            )
216
        except moto_exception.InvalidParameterException as e:
×
217
            raise InvalidParameterException(str(e))
×
218
        except moto_exception.InvalidRequestException:
×
219
            raise InvalidRequestException(
×
220
                "You tried to perform the operation on a secret that's currently marked deleted."
221
            )
222
        except moto_exception.SecretNotFoundException:
×
223
            raise SecretNotFoundException()
×
224
        return DeleteSecretResponse(ARN=arn, Name=name, DeletionDate=deletion_date)
1✔
225

226
    @handler("DescribeSecret", expand=False)
1✔
227
    def describe_secret(
1✔
228
        self, context: RequestContext, request: DescribeSecretRequest
229
    ) -> DescribeSecretResponse:
230
        secret_id: str = request["SecretId"]
1✔
231
        self._raise_if_invalid_secret_id(secret_id)
1✔
232
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
233
        try:
1✔
234
            secret = backend.describe_secret(secret_id)
1✔
235
        except moto_exception.SecretNotFoundException:
1✔
236
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
1✔
237
        return DescribeSecretResponse(**secret.to_dict())
1✔
238

239
    @handler("GetResourcePolicy", expand=False)
1✔
240
    def get_resource_policy(
1✔
241
        self, context: RequestContext, request: GetResourcePolicyRequest
242
    ) -> GetResourcePolicyResponse:
243
        secret_id = request["SecretId"]
1✔
244
        self._raise_if_invalid_secret_id(secret_id)
1✔
245
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
246
        policy = backend.get_resource_policy(secret_id)
1✔
247
        return GetResourcePolicyResponse(**json.loads(policy))
1✔
248

249
    @handler("GetSecretValue", expand=False)
1✔
250
    def get_secret_value(
1✔
251
        self, context: RequestContext, request: GetSecretValueRequest
252
    ) -> GetSecretValueResponse:
253
        secret_id = request.get("SecretId")
1✔
254
        version_id = request.get("VersionId")
1✔
255
        version_stage = request.get("VersionStage")
1✔
256
        if not version_id and not version_stage:
1✔
257
            version_stage = "AWSCURRENT"
1✔
258
        self._raise_if_invalid_secret_id(secret_id)
1✔
259
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
260
        self._raise_if_default_kms_key(secret_id, context, backend)
1✔
261
        try:
1✔
262
            response = backend.get_secret_value(secret_id, version_id, version_stage)
1✔
263
            response = decode_secret_binary_from_response(response)
1✔
264
        except moto_exception.SecretNotFoundException:
1✔
265
            raise ResourceNotFoundException(
1✔
266
                f"Secrets Manager can't find the specified secret value for staging label: {version_stage}"
267
            )
268
        except moto_exception.ResourceNotFoundException:
1✔
269
            error_message = (
1✔
270
                f"VersionId: {version_id}" if version_id else f"staging label: {version_stage}"
271
            )
272
            raise ResourceNotFoundException(
1✔
273
                f"Secrets Manager can't find the specified secret value for {error_message}"
274
            )
275
        except moto_exception.SecretStageVersionMismatchException:
1✔
276
            raise InvalidRequestException(
1✔
277
                "You provided a VersionStage that is not associated to the provided VersionId."
278
            )
279
        except moto_exception.SecretHasNoValueException:
1✔
280
            raise ResourceNotFoundException(
1✔
281
                f"Secrets Manager can't find the specified secret value for staging label: {version_stage}"
282
            )
283
        except moto_exception.InvalidRequestException:
×
284
            raise InvalidRequestException(
×
285
                "You can't perform this operation on the secret because it was marked for deletion."
286
            )
287
        return GetSecretValueResponse(**response)
1✔
288

289
    @handler("ListSecretVersionIds", expand=False)
1✔
290
    def list_secret_version_ids(
1✔
291
        self, context: RequestContext, request: ListSecretVersionIdsRequest
292
    ) -> ListSecretVersionIdsResponse:
293
        secret_id = request["SecretId"]
1✔
294
        include_deprecated = request.get("IncludeDeprecated", False)
1✔
295
        self._raise_if_invalid_secret_id(secret_id)
1✔
296
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
297
        secrets = backend.list_secret_version_ids(secret_id, include_deprecated=include_deprecated)
1✔
298
        return ListSecretVersionIdsResponse(**json.loads(secrets))
1✔
299

300
    @handler("PutResourcePolicy", expand=False)
1✔
301
    def put_resource_policy(
1✔
302
        self, context: RequestContext, request: PutResourcePolicyRequest
303
    ) -> PutResourcePolicyResponse:
304
        secret_id = request["SecretId"]
1✔
305
        self._raise_if_invalid_secret_id(secret_id)
1✔
306
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
307
        arn, name = backend.put_resource_policy(secret_id, request["ResourcePolicy"])
1✔
308
        return PutResourcePolicyResponse(ARN=arn, Name=name)
1✔
309

310
    @handler("PutSecretValue", expand=False)
1✔
311
    def put_secret_value(
1✔
312
        self, context: RequestContext, request: PutSecretValueRequest
313
    ) -> PutSecretValueResponse:
314
        secret_id = request["SecretId"]
1✔
315
        self._raise_if_invalid_secret_id(secret_id)
1✔
316
        self._raise_if_missing_client_req_token(request)
1✔
317
        client_req_token = request.get("ClientRequestToken")
1✔
318
        secret_string = request.get("SecretString")
1✔
319
        secret_binary = request.get("SecretBinary")
1✔
320
        if not secret_binary and not secret_string:
1✔
321
            raise InvalidRequestException("You must provide either SecretString or SecretBinary.")
×
322
        if secret_binary:
1✔
323
            secret_binary = base64.b64encode(secret_binary)
1✔
324
        version_stages = request.get("VersionStages", ["AWSCURRENT"])
1✔
325
        if not isinstance(version_stages, list):
1✔
UNCOV
326
            version_stages = [version_stages]
×
327

328
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
329
        self._raise_if_default_kms_key(secret_id, context, backend)
1✔
330

331
        response = backend.put_secret_value(
1✔
332
            secret_id=secret_id,
333
            secret_binary=secret_binary,
334
            secret_string=secret_string,
335
            version_stages=version_stages,
336
            client_request_token=client_req_token,
337
        )
338
        return PutSecretValueResponse(**json.loads(response))
1✔
339

340
    @handler("RemoveRegionsFromReplication", expand=False)
1✔
341
    def remove_regions_from_replication(
1✔
342
        self, context: RequestContext, request: RemoveRegionsFromReplicationRequest
343
    ) -> RemoveRegionsFromReplicationResponse:
344
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
345
        return call_moto(context, request)
×
346

347
    @handler("ReplicateSecretToRegions", expand=False)
1✔
348
    def replicate_secret_to_regions(
1✔
349
        self, context: RequestContext, request: ReplicateSecretToRegionsRequest
350
    ) -> ReplicateSecretToRegionsResponse:
351
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
352
        return call_moto(context, request)
×
353

354
    @handler("RestoreSecret", expand=False)
1✔
355
    def restore_secret(
1✔
356
        self, context: RequestContext, request: RestoreSecretRequest
357
    ) -> RestoreSecretResponse:
358
        secret_id = request["SecretId"]
1✔
359
        self._raise_if_invalid_secret_id(secret_id)
1✔
360
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
361
        try:
1✔
362
            arn, name = backend.restore_secret(secret_id)
1✔
363
        except moto_exception.SecretNotFoundException:
×
UNCOV
364
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
×
365
        return RestoreSecretResponse(ARN=arn, Name=name)
1✔
366

367
    @handler("RotateSecret", expand=False)
1✔
368
    def rotate_secret(
1✔
369
        self, context: RequestContext, request: RotateSecretRequest
370
    ) -> RotateSecretResponse:
371
        self._raise_if_missing_client_req_token(request)
1✔
372
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
373
        return call_moto(context, request)
1✔
374

375
    @handler("StopReplicationToReplica", expand=False)
1✔
376
    def stop_replication_to_replica(
1✔
377
        self, context: RequestContext, request: StopReplicationToReplicaRequest
378
    ) -> StopReplicationToReplicaResponse:
379
        self._raise_if_invalid_secret_id(request["SecretId"])
×
UNCOV
380
        return call_moto(context, request)
×
381

382
    @handler("TagResource", expand=False)
1✔
383
    def tag_resource(self, context: RequestContext, request: TagResourceRequest) -> None:
1✔
384
        secret_id = request["SecretId"]
1✔
385
        tags = request["Tags"]
1✔
386
        self._raise_if_invalid_secret_id(secret_id)
1✔
387
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
388
        backend.tag_resource(secret_id, tags)
1✔
389

390
    @handler("UntagResource", expand=False)
1✔
391
    def untag_resource(self, context: RequestContext, request: UntagResourceRequest) -> None:
1✔
392
        secret_id = request["SecretId"]
1✔
393
        tag_keys = request.get("TagKeys")
1✔
394
        self._raise_if_invalid_secret_id(secret_id)
1✔
395
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
396
        backend.untag_resource(secret_id=secret_id, tag_keys=tag_keys)
1✔
397

398
    @handler("UpdateSecret", expand=False)
1✔
399
    def update_secret(
1✔
400
        self, context: RequestContext, request: UpdateSecretRequest
401
    ) -> UpdateSecretResponse:
402
        # if we're modifying the value of the secret, ClientRequestToken is required
403
        secret_id = request["SecretId"]
1✔
404
        secret_string = request.get("SecretString")
1✔
405
        secret_binary = request.get("SecretBinary")
1✔
406
        if secret_binary:
1✔
407
            secret_binary = base64.b64encode(secret_binary)
1✔
408
        description = request.get("Description")
1✔
409
        kms_key_id = request.get("KmsKeyId")
1✔
410
        client_req_token = request.get("ClientRequestToken")
1✔
411
        self._raise_if_invalid_secret_id(secret_id)
1✔
412
        self._raise_if_missing_client_req_token(request)
1✔
413

414
        backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
1✔
415
        try:
1✔
416
            secret = backend.update_secret(
1✔
417
                secret_id,
418
                description=description,
419
                secret_string=secret_string,
420
                secret_binary=secret_binary,
421
                client_request_token=client_req_token,
422
                kms_key_id=kms_key_id,
423
            )
424
        except moto_exception.SecretNotFoundException:
×
UNCOV
425
            raise ResourceNotFoundException("Secrets Manager can't find the specified secret.")
×
UNCOV
426
        except moto_exception.OperationNotPermittedOnReplica:
×
427
            raise InvalidRequestException(
×
428
                "Operation not permitted on a replica secret. Call must be made in primary secret's region."
429
            )
UNCOV
430
        except moto_exception.InvalidRequestException:
×
UNCOV
431
            raise InvalidRequestException(
×
432
                "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
433
                "You can't perform this operation on the secret because it was marked for deletion."
434
            )
435
        return UpdateSecretResponse(**json.loads(secret))
1✔
436

437
    @handler("UpdateSecretVersionStage", expand=False)
1✔
438
    def update_secret_version_stage(
1✔
439
        self, context: RequestContext, request: UpdateSecretVersionStageRequest
440
    ) -> UpdateSecretVersionStageResponse:
441
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
442
        return call_moto(context, request)
1✔
443

444
    @handler("ValidateResourcePolicy", expand=False)
1✔
445
    def validate_resource_policy(
1✔
446
        self, context: RequestContext, request: ValidateResourcePolicyRequest
447
    ) -> ValidateResourcePolicyResponse:
448
        self._raise_if_invalid_secret_id(request["SecretId"])
1✔
UNCOV
449
        return call_moto(context, request)
×
450

451

452
@patch(FakeSecret.__init__)
1✔
453
def fake_secret__init__(fn, self, *args, **kwargs):
1✔
454
    fn(self, *args, **kwargs)
1✔
455

456
    # Fix time not including millis.
457
    time_now = time.time()
1✔
458
    if kwargs.get("last_changed_date", None):
1✔
459
        self.last_changed_date = time_now
1✔
460
    if kwargs.get("created_date", None):
1✔
461
        self.created_date = time_now
1✔
462

463
    # The last date that the secret value was retrieved.
464
    # This value does not include the time.
465
    # This field is omitted if the secret has never been retrieved.
466
    self.last_accessed_date = None
1✔
467
    # Results in RotationEnabled being returned only if rotation was ever overwritten,
468
    # in which case this field is non-null, but an integer.
469
    self.auto_rotate_after_days = None
1✔
470
    self.rotation_lambda_arn = None
1✔
471

472

473
@patch(FakeSecret.update)
1✔
474
def fake_secret_update(
1✔
475
    fn, self, description=None, tags=None, kms_key_id=None, last_changed_date=None
476
):
477
    fn(self, description, tags, kms_key_id, last_changed_date)
1✔
478
    if last_changed_date is not None:
1✔
479
        self.last_changed_date = round(time.time(), 3)
1✔
480

481

482
@patch(SecretsManagerBackend.get_secret_value)
1✔
483
def moto_smb_get_secret_value(fn, self, secret_id, version_id, version_stage):
1✔
484
    res = fn(self, secret_id, version_id, version_stage)
1✔
485

486
    secret = self.secrets[secret_id]
1✔
487

488
    # Patch: update last accessed date on get.
489
    secret.last_accessed_date = today_no_time()
1✔
490

491
    # Patch: update version's last accessed date.
492
    secret_version = secret.versions.get(version_id or secret.default_version_id)
1✔
493
    if secret_version:
1✔
494
        secret_version["last_accessed_date"] = secret.last_accessed_date
1✔
495

496
    return res
1✔
497

498

499
@patch(SecretsManagerBackend.create_secret)
1✔
500
def moto_smb_create_secret(fn, self, name, *args, **kwargs):
1✔
501
    # Creating a secret with a SecretId equal to one that is scheduled for
502
    # deletion should raise an 'InvalidRequestException'.
503
    secret: FakeSecret | None = self.secrets.get(name)
1✔
504
    if secret is not None and secret.deleted_date is not None:
1✔
505
        raise InvalidRequestException(AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION)
1✔
506

507
    if name in self.secrets:
1✔
508
        raise ResourceExistsException(
1✔
509
            f"The operation failed because the secret {name} already exists."
510
        )
511

512
    return fn(self, name, *args, **kwargs)
1✔
513

514

515
@patch(SecretsManagerBackend.list_secret_version_ids)
1✔
516
def moto_smb_list_secret_version_ids(
1✔
517
    _, self, secret_id: str, include_deprecated: bool, *args, **kwargs
518
):
519
    if secret_id not in self.secrets:
1✔
520
        raise SecretNotFoundException()
1✔
521

522
    if self.secrets[secret_id].is_deleted():
1✔
UNCOV
523
        raise InvalidRequestException(
×
524
            "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
525
            "You can't perform this operation on the secret because it was marked for deletion."
526
        )
527

528
    secret = self.secrets[secret_id]
1✔
529

530
    # Patch: output format, report exact createdate instead of current time.
531
    versions: list[SecretVersionsListEntry] = []
1✔
532
    for version_id, version in secret.versions.items():
1✔
533
        version_stages = version["version_stages"]
1✔
534
        # Patch: include deprecated versions if include_deprecated is True.
535
        # version_stages is empty if the version is deprecated.
536
        # see: https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html#term_version
537
        if len(version_stages) > 0 or include_deprecated:
1✔
538
            entry = SecretVersionsListEntry(
1✔
539
                CreatedDate=version["createdate"],
540
                VersionId=version_id,
541
            )
542

543
            if version_stages:
1✔
544
                entry["VersionStages"] = version_stages
1✔
545

546
            # Patch: bind LastAccessedDate if one exists for this version.
547
            last_accessed_date = version.get("last_accessed_date")
1✔
548
            if last_accessed_date:
1✔
549
                entry["LastAccessedDate"] = last_accessed_date
1✔
550

551
            versions.append(entry)
1✔
552

553
    # Patch: sort versions by date.
554
    versions.sort(key=lambda v: v["CreatedDate"], reverse=True)
1✔
555

556
    response = ListSecretVersionIdsResponse(ARN=secret.arn, Name=secret.name, Versions=versions)
1✔
557

558
    return json.dumps(response)
1✔
559

560

561
@patch(FakeSecret.to_dict)
1✔
562
def fake_secret_to_dict(fn, self):
1✔
563
    res_dict = fn(self)
1✔
564
    if self.last_accessed_date:
1✔
565
        res_dict["LastAccessedDate"] = self.last_accessed_date
1✔
566
    if not self.description and "Description" in res_dict:
1✔
UNCOV
567
        del res_dict["Description"]
×
568
    if not self.rotation_enabled and "RotationEnabled" in res_dict:
1✔
UNCOV
569
        del res_dict["RotationEnabled"]
×
570
    if self.auto_rotate_after_days is None and "RotationRules" in res_dict:
1✔
UNCOV
571
        del res_dict["RotationRules"]
×
572
    if self.tags is None and "Tags" in res_dict:
1✔
UNCOV
573
        del res_dict["Tags"]
×
574
    for null_field in [key for key, value in res_dict.items() if value is None]:
1✔
575
        del res_dict[null_field]
1✔
576
    return res_dict
1✔
577

578

579
@patch(SecretsManagerBackend.update_secret)
1✔
580
def backend_update_secret(
1✔
581
    fn,
582
    self,
583
    secret_id,
584
    description=None,
585
    **kwargs,
586
):
587
    if secret_id not in self.secrets:
1✔
588
        raise SecretNotFoundException()
×
589

590
    if self.secrets[secret_id].is_deleted():
1✔
UNCOV
591
        raise InvalidRequestException(
×
592
            "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
593
            "You can't perform this operation on the secret because it was marked for deletion."
594
        )
595

596
    secret = self.secrets[secret_id]
1✔
597
    version_id_t0 = secret.default_version_id
1✔
598

599
    requires_new_version: bool = any(
1✔
600
        [kwargs.get("kms_key_id"), kwargs.get("secret_binary"), kwargs.get("secret_string")]
601
    )
602
    if requires_new_version:
1✔
603
        fn(self, secret_id, **kwargs)
1✔
604

605
    if description is not None:
1✔
606
        secret.description = description
1✔
607

608
    version_id_t1 = secret.default_version_id
1✔
609

610
    resp: UpdateSecretResponse = UpdateSecretResponse()
1✔
611
    resp["ARN"] = secret.arn
1✔
612
    resp["Name"] = secret.name
1✔
613

614
    if version_id_t0 != version_id_t1:
1✔
615
        resp["VersionId"] = version_id_t1
1✔
616

617
    return json.dumps(resp)
1✔
618

619

620
@patch(SecretsManagerBackend.tag_resource)
1✔
621
def backend_tag_resource(fn, self, secret_id, tags):
1✔
622
    if secret_id not in self.secrets:
1✔
UNCOV
623
        raise SecretNotFoundException()
×
624

625
    if self.secrets[secret_id].is_deleted():
1✔
626
        raise InvalidRequestException(
1✔
627
            "You can't perform this operation on the secret because it was marked for deletion."
628
        )
629

630
    return fn(self, secret_id, tags)
1✔
631

632

633
@patch(SecretsManagerBackend.untag_resource)
1✔
634
def backend_untag_resource(fn, self, secret_id, tag_keys):
1✔
635
    if secret_id not in self.secrets:
1✔
UNCOV
636
        raise SecretNotFoundException()
×
637

638
    if self.secrets[secret_id].is_deleted():
1✔
639
        raise InvalidRequestException(
1✔
640
            "You can't perform this operation on the secret because it was marked for deletion."
641
        )
642

643
    return fn(self, secret_id, tag_keys)
1✔
644

645

646
@patch(SecretsManagerResponse.update_secret, pass_target=False)
1✔
647
def response_update_secret(self):
1✔
648
    secret_id = self._get_param("SecretId")
×
649
    description = self._get_param("Description")
×
650
    secret_string = self._get_param("SecretString")
×
651
    secret_binary = self._get_param("SecretBinary")
×
UNCOV
652
    client_request_token = self._get_param("ClientRequestToken")
×
UNCOV
653
    kms_key_id = self._get_param("KmsKeyId")
×
UNCOV
654
    return self.backend.update_secret(
×
655
        secret_id=secret_id,
656
        description=description,
657
        secret_string=secret_string,
658
        secret_binary=secret_binary,
659
        client_request_token=client_request_token,
660
        kms_key_id=kms_key_id,
661
    )
662

663

664
@patch(SecretsManagerBackend.update_secret_version_stage)
1✔
665
def backend_update_secret_version_stage(
1✔
666
    fn, self, secret_id, version_stage, remove_from_version_id, move_to_version_id
667
):
668
    fn(self, secret_id, version_stage, remove_from_version_id, move_to_version_id)
1✔
669

670
    secret = self.secrets[secret_id]
1✔
671

672
    # Patch: default version is the new AWSCURRENT version
673
    if version_stage == AWSCURRENT:
1✔
674
        secret.default_version_id = move_to_version_id
1✔
675

676
    versions_no_stages = []
1✔
677
    for version_id, version in secret.versions.items():
1✔
678
        version_stages = version["version_stages"]
1✔
679

680
        # moto appends a new AWSPREVIOUS label to the version AWSCURRENT was removed from,
681
        # but it does not remove the old AWSPREVIOUS label.
682
        # Patch: ensure only one AWSPREVIOUS tagged version is in the pool.
683
        if (
1✔
684
            version_stage == AWSCURRENT
685
            and version_id != remove_from_version_id
686
            and AWSPREVIOUS in version_stages
687
        ):
UNCOV
688
            version_stages.remove(AWSPREVIOUS)
×
689

690
        if not version_stages:
1✔
691
            versions_no_stages.append(version_id)
1✔
692

693
    # Patch: remove secret versions with no version stages.
694
    for version_no_stages in versions_no_stages:
1✔
695
        del secret.versions[version_no_stages]
1✔
696

697
    return secret.arn, secret.name
1✔
698

699

700
@patch(FakeSecret.reset_default_version)
1✔
701
def fake_secret_reset_default_version(fn, self, secret_version, version_id):
1✔
702
    fn(self, secret_version, version_id)
1✔
703

704
    # Remove versions with no version stages, if max limit of outdated versions is exceeded.
705
    versions_no_stages: list[str] = [
1✔
706
        version_id for version_id, version in self.versions.items() if not version["version_stages"]
707
    ]
708
    versions_to_delete: list[str] = []
1✔
709

710
    # Patch: remove outdated versions if the max deprecated versions limit is exceeded.
711
    if len(versions_no_stages) >= MAX_OUTDATED_SECRET_VERSIONS:
1✔
712
        versions_to_delete = versions_no_stages[
1✔
713
            : len(versions_no_stages) - MAX_OUTDATED_SECRET_VERSIONS
714
        ]
715

716
    for version_to_delete in versions_to_delete:
1✔
717
        del self.versions[version_to_delete]
1✔
718

719

720
@patch(FakeSecret.remove_version_stages_from_old_versions)
1✔
721
def fake_secret_remove_version_stages_from_old_versions(fn, self, version_stages):
1✔
722
    fn(self, version_stages)
1✔
723
    # Remove versions with no version stages.
724
    versions_no_stages = [
1✔
725
        version_id for version_id, version in self.versions.items() if not version["version_stages"]
726
    ]
727
    for version_no_stages in versions_no_stages:
1✔
728
        del self.versions[version_no_stages]
1✔
729

730

731
# Moto does not support rotate_immediately as an API parameter while the AWS API does
732
@patch(SecretsManagerResponse.rotate_secret, pass_target=False)
1✔
733
def rotate_secret(self) -> str:
1✔
734
    client_request_token = self._get_param("ClientRequestToken")
1✔
735
    rotation_lambda_arn = self._get_param("RotationLambdaARN")
1✔
736
    rotation_rules = self._get_param("RotationRules")
1✔
737
    rotate_immediately = self._get_param("RotateImmediately")
1✔
738
    secret_id = self._get_param("SecretId")
1✔
739
    return self.backend.rotate_secret(
1✔
740
        secret_id=secret_id,
741
        client_request_token=client_request_token,
742
        rotation_lambda_arn=rotation_lambda_arn,
743
        rotation_rules=rotation_rules,
744
        rotate_immediately=True if rotate_immediately is None else rotate_immediately,
745
    )
746

747

748
@patch(SecretsManagerBackend.rotate_secret)
1✔
749
def backend_rotate_secret(
1✔
750
    _,
751
    self,
752
    secret_id,
753
    client_request_token=None,
754
    rotation_lambda_arn=None,
755
    rotation_rules=None,
756
    rotate_immediately=True,
757
):
758
    rotation_days = "AutomaticallyAfterDays"
1✔
759

760
    if not self._is_valid_identifier(secret_id):
1✔
UNCOV
761
        raise SecretNotFoundException()
×
762

763
    secret = self.secrets[secret_id]
1✔
764
    if secret.is_deleted():
1✔
UNCOV
765
        raise InvalidRequestException(
×
766
            "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
767
            perform the operation on a secret that's currently marked deleted."
768
        )
769
    # Resolve rotation_lambda_arn and fallback to previous value if its missing
770
    # from the current request
771
    rotation_lambda_arn = rotation_lambda_arn or secret.rotation_lambda_arn
1✔
772
    if not rotation_lambda_arn:
1✔
773
        raise InvalidRequestException(
1✔
774
            "No Lambda rotation function ARN is associated with this secret."
775
        )
776

777
    if rotation_lambda_arn:
1✔
778
        if len(rotation_lambda_arn) > 2048:
1✔
UNCOV
779
            msg = "RotationLambdaARN must <= 2048 characters long."
×
UNCOV
780
            raise InvalidParameterException(msg)
×
781

782
    # In case rotation_period is not provided, resolve auto_rotate_after_days
783
    # and fallback to previous value if its missing from the current request.
784
    rotation_period = secret.auto_rotate_after_days or 0
1✔
785
    if rotation_rules:
1✔
786
        if rotation_days in rotation_rules:
1✔
787
            rotation_period = rotation_rules[rotation_days]
1✔
788
            if rotation_period < 1 or rotation_period > 1000:
1✔
UNCOV
789
                msg = "RotationRules.AutomaticallyAfterDays must be within 1-1000."
×
UNCOV
790
                raise InvalidParameterException(msg)
×
791

792
    try:
1✔
793
        lm_client = connect_to(region_name=self.region_name).lambda_
1✔
794
        lm_client.get_function(FunctionName=rotation_lambda_arn)
1✔
795
    except Exception:
1✔
796
        raise ResourceNotFoundException("Lambda does not exist or could not be accessed")
1✔
797

798
    # The rotation function must end with the versions of the secret in
799
    # one of two states:
800
    #
801
    #  - The AWSPENDING and AWSCURRENT staging labels are attached to the
802
    #    same version of the secret, or
803
    #  - The AWSPENDING staging label is not attached to any version of the secret.
804
    #
805
    # If the AWSPENDING staging label is present but not attached to the same
806
    # version as AWSCURRENT then any later invocation of RotateSecret assumes
807
    # that a previous rotation request is still in progress and returns an error.
808
    try:
1✔
809
        pending_version = None
1✔
810
        version = next(
1✔
811
            version
812
            for version in secret.versions.values()
813
            if AWSPENDING in version["version_stages"]
814
        )
815
        if AWSCURRENT not in version["version_stages"]:
×
UNCOV
816
            msg = "Previous rotation request is still in progress."
×
817
            # Delay exception, so we can trigger lambda again
UNCOV
818
            pending_version = [InvalidRequestException(msg), version]
×
819

820
    except StopIteration:
1✔
821
        # Pending is not present in any version
822
        pass
1✔
823

824
    secret.rotation_lambda_arn = rotation_lambda_arn
1✔
825
    secret.auto_rotate_after_days = rotation_period
1✔
826
    if secret.auto_rotate_after_days > 0:
1✔
827
        wait_interval_s = int(rotation_period) * 86400
1✔
828
        secret.next_rotation_date = int(time.time()) + wait_interval_s
1✔
829
        secret.rotation_enabled = True
1✔
830
        secret.rotation_requested = True
1✔
831

832
    if rotate_immediately:
1✔
833
        if not pending_version:
1✔
834
            # Begin the rotation process for the given secret by invoking the lambda function.
835
            #
836
            # We add the new secret version as "pending". The previous version remains
837
            # as "current" for now. Once we've passed the new secret through the lambda
838
            # rotation function (if provided) we can then update the status to "current".
839
            new_version_id = self._from_client_request_token(client_request_token)
1✔
840

841
            # An initial dummy secret value is necessary otherwise moto is not adding the new
842
            # secret version.
843
            self._add_secret(
1✔
844
                secret_id,
845
                "dummy_password",
846
                description=secret.description,
847
                tags=secret.tags,
848
                version_id=new_version_id,
849
                version_stages=[AWSPENDING],
850
            )
851

852
            # AWS secret rotation function templates have checks on existing values so we remove
853
            # the dummy value to force the lambda to generate a new one.
854
            del secret.versions[new_version_id]["secret_string"]
1✔
855
        else:
UNCOV
856
            new_version_id = pending_version.pop()["version_id"]
×
857

858
        try:
1✔
859
            for step in ["create", "set", "test", "finish"]:
1✔
860
                resp = lm_client.invoke(
1✔
861
                    FunctionName=rotation_lambda_arn,
862
                    Payload=json.dumps(
863
                        {
864
                            "Step": step + "Secret",
865
                            "SecretId": secret.name,
866
                            "ClientRequestToken": new_version_id,
867
                        }
868
                    ),
869
                )
870
                if resp.get("FunctionError"):
1✔
871
                    data = json.loads(resp.get("Payload").read())
×
872
                    raise Exception(data.get("errorType"))
×
873
        except Exception as e:
×
UNCOV
874
            LOG.debug("An exception (%s) has occurred in %s", str(e), rotation_lambda_arn)
×
UNCOV
875
            if pending_version:
×
UNCOV
876
                raise pending_version.pop()
×
877
            # Fall through if there is no previously pending version so we'll "stuck" with a new
878
            # secret version in AWSPENDING state.
879
    secret.last_rotation_date = int(time.time())
1✔
880
    return secret.to_short_dict(version_id=new_version_id)
1✔
881

882

883
@patch(moto_exception.SecretNotFoundException.__init__)
1✔
884
def moto_secret_not_found_exception_init(fn, self):
1✔
885
    fn(self)
1✔
886
    self.code = 400
1✔
887

888

889
@patch(FakeSecret._form_version_ids_to_stages, pass_target=False)
1✔
890
def _form_version_ids_to_stages_modal(self):
1✔
891
    version_id_to_stages: dict[str, list] = {}
1✔
892
    for key, value in self.versions.items():
1✔
893
        # Patch: include version_stages in the response only if it is not empty.
894
        if len(value["version_stages"]) > 0:
1✔
895
            version_id_to_stages[key] = value["version_stages"]
1✔
896
    return version_id_to_stages
1✔
897

898

899
# patching resource policy in moto
900
def get_resource_policy_model(self, secret_id):
1✔
901
    if self._is_valid_identifier(secret_id):
1✔
902
        result = {
1✔
903
            "ARN": self.secrets[secret_id].arn,
904
            "Name": self.secrets[secret_id].secret_id,
905
        }
906
        policy = getattr(self.secrets[secret_id], "policy", None)
1✔
907
        if policy:
1✔
908
            result["ResourcePolicy"] = policy
1✔
909
        return json.dumps(result)
1✔
910
    else:
UNCOV
911
        raise SecretNotFoundException()
×
912

913

914
def get_resource_policy_response(self):
1✔
UNCOV
915
    secret_id = self._get_param("SecretId")
×
UNCOV
916
    return self.backend.get_resource_policy(secret_id=secret_id)
×
917

918

919
def decode_secret_binary_from_response(response: dict[str, Any]):
1✔
920
    if "SecretBinary" in response:
1✔
921
        response["SecretBinary"] = base64.b64decode(response["SecretBinary"])
1✔
922

923
    return response
1✔
924

925

926
def delete_resource_policy_model(self, secret_id):
1✔
UNCOV
927
    if self._is_valid_identifier(secret_id):
×
UNCOV
928
        self.secrets[secret_id].policy = None
×
UNCOV
929
        return json.dumps(
×
930
            {
931
                "ARN": self.secrets[secret_id].arn,
932
                "Name": self.secrets[secret_id].secret_id,
933
            }
934
        )
935
    else:
UNCOV
936
        raise SecretNotFoundException()
×
937

938

939
def delete_resource_policy_response(self):
1✔
UNCOV
940
    secret_id = self._get_param("SecretId")
×
UNCOV
941
    return self.backend.delete_resource_policy(secret_id=secret_id)
×
942

943

944
def put_resource_policy_model(self, secret_id, resource_policy):
1✔
945
    policy_validator = IAMPolicyDocumentValidator(resource_policy)
×
946
    policy_validator._validate_top_elements()
×
947
    policy_validator._validate_version_syntax()
×
UNCOV
948
    if self._is_valid_identifier(secret_id):
×
UNCOV
949
        self.secrets[secret_id].policy = resource_policy
×
UNCOV
950
        return json.dumps(
×
951
            {
952
                "ARN": self.secrets[secret_id].arn,
953
                "Name": self.secrets[secret_id].secret_id,
954
            }
955
        )
956
    else:
UNCOV
957
        raise SecretNotFoundException()
×
958

959

960
def put_resource_policy_response(self):
1✔
UNCOV
961
    secret_id = self._get_param("SecretId")
×
UNCOV
962
    resource_policy = self._get_param("ResourcePolicy")
×
UNCOV
963
    return self.backend.put_resource_policy(
×
964
        secret_id=secret_id, resource_policy=json.loads(resource_policy)
965
    )
966

967

968
def apply_patches():
1✔
969
    SecretsManagerBackend.get_resource_policy = get_resource_policy_model
1✔
970
    SecretsManagerResponse.get_resource_policy = get_resource_policy_response
1✔
971

972
    if not hasattr(SecretsManagerBackend, "delete_resource_policy"):
1✔
UNCOV
973
        SecretsManagerBackend.delete_resource_policy = delete_resource_policy_model
×
974
    if not hasattr(SecretsManagerResponse, "delete_resource_policy"):
1✔
UNCOV
975
        SecretsManagerResponse.delete_resource_policy = delete_resource_policy_response
×
976
    if not hasattr(SecretsManagerBackend, "put_resource_policy"):
1✔
UNCOV
977
        SecretsManagerBackend.put_resource_policy = put_resource_policy_model
×
978
    if not hasattr(SecretsManagerResponse, "put_resource_policy"):
1✔
UNCOV
979
        SecretsManagerResponse.put_resource_policy = put_resource_policy_response
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc