• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / b810cb22-0258-4411-b264-2fe3c023cb59

12 Mar 2025 08:50PM UTC coverage: 86.944% (+0.03%) from 86.915%
b810cb22-0258-4411-b264-2fe3c023cb59

push

circleci

web-flow
fix APIGW binary media types (#12371)

2 of 2 new or added lines in 1 file covered. (100.0%)

45 existing lines in 8 files now uncovered.

62242 of 71589 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.94
/localstack-core/localstack/services/iam/provider.py
1
import inspect
1✔
2
import json
1✔
3
import logging
1✔
4
import random
1✔
5
import re
1✔
6
import string
1✔
7
from datetime import datetime
1✔
8
from typing import Any, Dict, List, TypeVar
1✔
9
from urllib.parse import quote
1✔
10

11
from moto.iam.models import (
1✔
12
    IAMBackend,
13
    filter_items_with_path_prefix,
14
    iam_backends,
15
)
16
from moto.iam.models import Role as MotoRole
1✔
17
from moto.iam.models import User as MotoUser
1✔
18
from moto.iam.utils import generate_access_key_id_from_account_id
1✔
19

20
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
21
from localstack.aws.api.iam import (
1✔
22
    ActionNameListType,
23
    ActionNameType,
24
    AttachedPermissionsBoundary,
25
    ContextEntryListType,
26
    CreateRoleRequest,
27
    CreateRoleResponse,
28
    CreateServiceLinkedRoleResponse,
29
    CreateServiceSpecificCredentialResponse,
30
    CreateUserResponse,
31
    DeleteConflictException,
32
    DeleteServiceLinkedRoleResponse,
33
    DeletionTaskIdType,
34
    DeletionTaskStatusType,
35
    EvaluationResult,
36
    GetServiceLinkedRoleDeletionStatusResponse,
37
    GetUserResponse,
38
    IamApi,
39
    ListInstanceProfileTagsResponse,
40
    ListRolesResponse,
41
    ListServiceSpecificCredentialsResponse,
42
    MalformedPolicyDocumentException,
43
    NoSuchEntityException,
44
    PolicyEvaluationDecisionType,
45
    ResetServiceSpecificCredentialResponse,
46
    ResourceHandlingOptionType,
47
    ResourceNameListType,
48
    ResourceNameType,
49
    Role,
50
    ServiceSpecificCredential,
51
    ServiceSpecificCredentialMetadata,
52
    SimulatePolicyResponse,
53
    SimulationPolicyListType,
54
    Tag,
55
    User,
56
    arnType,
57
    customSuffixType,
58
    existingUserNameType,
59
    groupNameType,
60
    instanceProfileNameType,
61
    markerType,
62
    maxItemsType,
63
    pathPrefixType,
64
    pathType,
65
    policyDocumentType,
66
    roleDescriptionType,
67
    roleNameType,
68
    serviceName,
69
    serviceSpecificCredentialId,
70
    statusType,
71
    tagKeyListType,
72
    tagListType,
73
    userNameType,
74
)
75
from localstack.aws.connect import connect_to
1✔
76
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
77
from localstack.services.iam.iam_patches import apply_iam_patches
1✔
78
from localstack.services.moto import call_moto
1✔
79
from localstack.utils.aws.request_context import extract_access_key_id_from_auth_header
1✔
80
from localstack.utils.common import short_uid
1✔
81

82
LOG = logging.getLogger(__name__)
1✔
83

84
SERVICE_LINKED_ROLE_PATH_PREFIX = "/aws-service-role"
1✔
85

86
POLICY_ARN_REGEX = re.compile(r"arn:[^:]+:iam::(?:\d{12}|aws):policy/.*")
1✔
87

88
CREDENTIAL_ID_REGEX = re.compile(r"^\w+$")
1✔
89

90
T = TypeVar("T")
1✔
91

92

93
class ValidationError(CommonServiceException):
1✔
94
    def __init__(self, message: str):
1✔
95
        super().__init__("ValidationError", message, 400, True)
1✔
96

97

98
class ValidationListError(ValidationError):
1✔
99
    def __init__(self, validation_errors: list[str]):
1✔
100
        message = f"{len(validation_errors)} validation error{'s' if len(validation_errors) > 1 else ''} detected: {'; '.join(validation_errors)}"
1✔
101
        super().__init__(message)
1✔
102

103

104
def get_iam_backend(context: RequestContext) -> IAMBackend:
1✔
105
    return iam_backends[context.account_id][context.partition]
1✔
106

107

108
class IamProvider(IamApi):
1✔
109
    def __init__(self):
1✔
110
        apply_iam_patches()
1✔
111

112
    @handler("CreateRole", expand=False)
1✔
113
    def create_role(
1✔
114
        self, context: RequestContext, request: CreateRoleRequest
115
    ) -> CreateRoleResponse:
116
        try:
1✔
117
            json.loads(request["AssumeRolePolicyDocument"])
1✔
118
        except json.JSONDecodeError:
1✔
119
            raise MalformedPolicyDocumentException("This policy contains invalid Json")
1✔
120
        result = call_moto(context)
1✔
121

122
        if not request.get("MaxSessionDuration") and result["Role"].get("MaxSessionDuration"):
1✔
123
            result["Role"].pop("MaxSessionDuration")
1✔
124

125
        if "RoleLastUsed" in result["Role"] and not result["Role"]["RoleLastUsed"]:
1✔
126
            # not part of the AWS response if it's empty
127
            # FIXME: RoleLastUsed did not seem well supported when this check was added
128
            result["Role"].pop("RoleLastUsed")
1✔
129

130
        return result
1✔
131

132
    @staticmethod
1✔
133
    def build_evaluation_result(
1✔
134
        action_name: ActionNameType, resource_name: ResourceNameType, policy_statements: List[Dict]
135
    ) -> EvaluationResult:
136
        eval_res = EvaluationResult()
1✔
137
        eval_res["EvalActionName"] = action_name
1✔
138
        eval_res["EvalResourceName"] = resource_name
1✔
139
        eval_res["EvalDecision"] = PolicyEvaluationDecisionType.explicitDeny
1✔
140
        for statement in policy_statements:
1✔
141
            # TODO Implement evaluation logic here
142
            if (
1✔
143
                action_name in statement["Action"]
144
                and resource_name in statement["Resource"]
145
                and statement["Effect"] == "Allow"
146
            ):
147
                eval_res["EvalDecision"] = PolicyEvaluationDecisionType.allowed
1✔
148
                eval_res["MatchedStatements"] = []  # TODO: add support for statement compilation.
1✔
149
        return eval_res
1✔
150

151
    def simulate_principal_policy(
1✔
152
        self,
153
        context: RequestContext,
154
        policy_source_arn: arnType,
155
        action_names: ActionNameListType,
156
        policy_input_list: SimulationPolicyListType = None,
157
        permissions_boundary_policy_input_list: SimulationPolicyListType = None,
158
        resource_arns: ResourceNameListType = None,
159
        resource_policy: policyDocumentType = None,
160
        resource_owner: ResourceNameType = None,
161
        caller_arn: ResourceNameType = None,
162
        context_entries: ContextEntryListType = None,
163
        resource_handling_option: ResourceHandlingOptionType = None,
164
        max_items: maxItemsType = None,
165
        marker: markerType = None,
166
        **kwargs,
167
    ) -> SimulatePolicyResponse:
168
        backend = get_iam_backend(context)
1✔
169
        policy = backend.get_policy(policy_source_arn)
1✔
170
        policy_version = backend.get_policy_version(policy_source_arn, policy.default_version_id)
1✔
171
        try:
1✔
172
            policy_statements = json.loads(policy_version.document).get("Statement", [])
1✔
173
        except Exception:
×
UNCOV
174
            raise NoSuchEntityException("Policy not found")
×
175

176
        evaluations = [
1✔
177
            self.build_evaluation_result(action_name, resource_arn, policy_statements)
178
            for action_name in action_names
179
            for resource_arn in resource_arns
180
        ]
181

182
        response = SimulatePolicyResponse()
1✔
183
        response["IsTruncated"] = False
1✔
184
        response["EvaluationResults"] = evaluations
1✔
185
        return response
1✔
186

187
    def delete_policy(self, context: RequestContext, policy_arn: arnType, **kwargs) -> None:
1✔
188
        backend = get_iam_backend(context)
1✔
189
        if backend.managed_policies.get(policy_arn):
1✔
190
            backend.managed_policies.pop(policy_arn, None)
1✔
191
        else:
192
            raise NoSuchEntityException("Policy {0} was not found.".format(policy_arn))
1✔
193

194
    def detach_role_policy(
1✔
195
        self, context: RequestContext, role_name: roleNameType, policy_arn: arnType, **kwargs
196
    ) -> None:
197
        backend = get_iam_backend(context)
1✔
198
        try:
1✔
199
            role = backend.get_role(role_name)
1✔
200
            policy = role.managed_policies[policy_arn]
1✔
201
            policy.detach_from(role)
1✔
UNCOV
202
        except KeyError:
×
UNCOV
203
            raise NoSuchEntityException("Policy {0} was not found.".format(policy_arn))
×
204

205
    @staticmethod
1✔
206
    def moto_role_to_role_type(moto_role: MotoRole) -> Role:
1✔
207
        role = Role()
1✔
208
        role["Path"] = moto_role.path
1✔
209
        role["RoleName"] = moto_role.name
1✔
210
        role["RoleId"] = moto_role.id
1✔
211
        role["Arn"] = moto_role.arn
1✔
212
        role["CreateDate"] = moto_role.create_date
1✔
213
        if moto_role.assume_role_policy_document:
1✔
214
            role["AssumeRolePolicyDocument"] = moto_role.assume_role_policy_document
1✔
215
        if moto_role.description:
1✔
216
            role["Description"] = moto_role.description
1✔
217
        if moto_role.max_session_duration:
1✔
218
            role["MaxSessionDuration"] = moto_role.max_session_duration
1✔
219
        if moto_role.permissions_boundary:
1✔
220
            role["PermissionsBoundary"] = moto_role.permissions_boundary
1✔
221
        if moto_role.tags:
1✔
UNCOV
222
            role["Tags"] = [Tag(Key=k, Value=v) for k, v in moto_role.tags.items()]
×
223
        # role["RoleLastUsed"]: # TODO: add support
224
        return role
1✔
225

226
    def list_roles(
1✔
227
        self,
228
        context: RequestContext,
229
        path_prefix: pathPrefixType = None,
230
        marker: markerType = None,
231
        max_items: maxItemsType = None,
232
        **kwargs,
233
    ) -> ListRolesResponse:
234
        backend = get_iam_backend(context)
1✔
235
        moto_roles = backend.roles.values()
1✔
236
        if path_prefix:
1✔
237
            moto_roles = filter_items_with_path_prefix(path_prefix, moto_roles)
1✔
238
        moto_roles = sorted(moto_roles, key=lambda role: role.id)
1✔
239

240
        response_roles = []
1✔
241
        for moto_role in moto_roles:
1✔
242
            response_role = self.moto_role_to_role_type(moto_role)
1✔
243
            # Permission boundary should not be a part of the response
244
            response_role.pop("PermissionsBoundary", None)
1✔
245
            response_roles.append(response_role)
1✔
246
            if path_prefix:  # TODO: this is consistent with the patch it migrates, but should add tests for this.
1✔
247
                response_role["AssumeRolePolicyDocument"] = quote(
1✔
248
                    json.dumps(moto_role.assume_role_policy_document or {})
249
                )
250

251
        return ListRolesResponse(Roles=response_roles, IsTruncated=False)
1✔
252

253
    def update_group(
1✔
254
        self,
255
        context: RequestContext,
256
        group_name: groupNameType,
257
        new_path: pathType = None,
258
        new_group_name: groupNameType = None,
259
        **kwargs,
260
    ) -> None:
UNCOV
261
        new_group_name = new_group_name or group_name
×
UNCOV
262
        backend = get_iam_backend(context)
×
UNCOV
263
        group = backend.get_group(group_name)
×
UNCOV
264
        group.path = new_path
×
UNCOV
265
        group.name = new_group_name
×
UNCOV
266
        backend.groups[new_group_name] = backend.groups.pop(group_name)
×
267

268
    def list_instance_profile_tags(
1✔
269
        self,
270
        context: RequestContext,
271
        instance_profile_name: instanceProfileNameType,
272
        marker: markerType = None,
273
        max_items: maxItemsType = None,
274
        **kwargs,
275
    ) -> ListInstanceProfileTagsResponse:
276
        backend = get_iam_backend(context)
1✔
277
        profile = backend.get_instance_profile(instance_profile_name)
1✔
278
        response = ListInstanceProfileTagsResponse()
1✔
279
        response["Tags"] = [Tag(Key=k, Value=v) for k, v in profile.tags.items()]
1✔
280
        return response
1✔
281

282
    def tag_instance_profile(
1✔
283
        self,
284
        context: RequestContext,
285
        instance_profile_name: instanceProfileNameType,
286
        tags: tagListType,
287
        **kwargs,
288
    ) -> None:
289
        backend = get_iam_backend(context)
1✔
290
        profile = backend.get_instance_profile(instance_profile_name)
1✔
291
        value_by_key = {tag["Key"]: tag["Value"] for tag in tags}
1✔
292
        profile.tags.update(value_by_key)
1✔
293

294
    def untag_instance_profile(
1✔
295
        self,
296
        context: RequestContext,
297
        instance_profile_name: instanceProfileNameType,
298
        tag_keys: tagKeyListType,
299
        **kwargs,
300
    ) -> None:
301
        backend = get_iam_backend(context)
1✔
302
        profile = backend.get_instance_profile(instance_profile_name)
1✔
303
        for tag in tag_keys:
1✔
304
            profile.tags.pop(tag, None)
1✔
305

306
    def create_service_linked_role(
1✔
307
        self,
308
        context: RequestContext,
309
        aws_service_name: groupNameType,
310
        description: roleDescriptionType = None,
311
        custom_suffix: customSuffixType = None,
312
        **kwargs,
313
    ) -> CreateServiceLinkedRoleResponse:
314
        # TODO: test
315
        # TODO: how to support "CustomSuffix" API request parameter?
316
        policy_doc = json.dumps(
1✔
317
            {
318
                "Version": "2012-10-17",
319
                "Statement": [
320
                    {
321
                        "Effect": "Allow",
322
                        "Principal": {"Service": aws_service_name},
323
                        "Action": "sts:AssumeRole",
324
                    }
325
                ],
326
            }
327
        )
328
        path = f"{SERVICE_LINKED_ROLE_PATH_PREFIX}/{aws_service_name}"
1✔
329
        role_name = f"r-{short_uid()}"
1✔
330
        backend = get_iam_backend(context)
1✔
331
        role = backend.create_role(
1✔
332
            role_name=role_name,
333
            assume_role_policy_document=policy_doc,
334
            path=path,
335
            permissions_boundary="",
336
            description=description,
337
            tags={},
338
            max_session_duration=3600,
339
        )
340
        role.service_linked_role_arn = "arn:{0}:iam::{1}:role/aws-service-role/{2}/{3}".format(
1✔
341
            context.partition, context.account_id, aws_service_name, role.name
342
        )
343

344
        res_role = self.moto_role_to_role_type(role)
1✔
345
        return CreateServiceLinkedRoleResponse(Role=res_role)
1✔
346

347
    def delete_service_linked_role(
1✔
348
        self, context: RequestContext, role_name: roleNameType, **kwargs
349
    ) -> DeleteServiceLinkedRoleResponse:
350
        # TODO: test
351
        backend = get_iam_backend(context)
1✔
352
        backend.delete_role(role_name)
1✔
353
        return DeleteServiceLinkedRoleResponse(DeletionTaskId=short_uid())
1✔
354

355
    def get_service_linked_role_deletion_status(
1✔
356
        self, context: RequestContext, deletion_task_id: DeletionTaskIdType, **kwargs
357
    ) -> GetServiceLinkedRoleDeletionStatusResponse:
358
        # TODO: test
UNCOV
359
        return GetServiceLinkedRoleDeletionStatusResponse(Status=DeletionTaskStatusType.SUCCEEDED)
×
360

361
    def put_user_permissions_boundary(
1✔
362
        self,
363
        context: RequestContext,
364
        user_name: userNameType,
365
        permissions_boundary: arnType,
366
        **kwargs,
367
    ) -> None:
368
        if user := get_iam_backend(context).users.get(user_name):
1✔
369
            user.permissions_boundary = permissions_boundary
1✔
370
        else:
UNCOV
371
            raise NoSuchEntityException()
×
372

373
    def delete_user_permissions_boundary(
1✔
374
        self, context: RequestContext, user_name: userNameType, **kwargs
375
    ) -> None:
376
        if user := get_iam_backend(context).users.get(user_name):
1✔
377
            if hasattr(user, "permissions_boundary"):
1✔
378
                delattr(user, "permissions_boundary")
1✔
379
        else:
UNCOV
380
            raise NoSuchEntityException()
×
381

382
    def create_user(
1✔
383
        self,
384
        context: RequestContext,
385
        user_name: userNameType,
386
        path: pathType = None,
387
        permissions_boundary: arnType = None,
388
        tags: tagListType = None,
389
        **kwargs,
390
    ) -> CreateUserResponse:
391
        response = call_moto(context=context)
1✔
392
        user = get_iam_backend(context).get_user(user_name)
1✔
393
        if permissions_boundary:
1✔
394
            user.permissions_boundary = permissions_boundary
1✔
395
            response["User"]["PermissionsBoundary"] = AttachedPermissionsBoundary(
1✔
396
                PermissionsBoundaryArn=permissions_boundary,
397
                PermissionsBoundaryType="Policy",
398
            )
399
        return response
1✔
400

401
    def get_user(
1✔
402
        self, context: RequestContext, user_name: existingUserNameType = None, **kwargs
403
    ) -> GetUserResponse:
404
        response = call_moto(context=context)
1✔
405
        moto_user_name = response["User"]["UserName"]
1✔
406
        moto_user = get_iam_backend(context).users.get(moto_user_name)
1✔
407
        # if the user does not exist or is no user
408
        if not moto_user and not user_name:
1✔
409
            access_key_id = extract_access_key_id_from_auth_header(context.request.headers)
1✔
410
            sts_client = connect_to(
1✔
411
                region_name=context.region,
412
                aws_access_key_id=access_key_id,
413
                aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
414
            ).sts
415
            caller_identity = sts_client.get_caller_identity()
1✔
416
            caller_arn = caller_identity["Arn"]
1✔
417
            if caller_arn.endswith(":root"):
1✔
418
                return GetUserResponse(
1✔
419
                    User=User(
420
                        UserId=context.account_id,
421
                        Arn=caller_arn,
422
                        CreateDate=datetime.now(),
423
                        PasswordLastUsed=datetime.now(),
424
                    )
425
                )
426
            else:
427
                raise CommonServiceException(
1✔
428
                    "ValidationError",
429
                    "Must specify userName when calling with non-User credentials",
430
                )
431

432
        if hasattr(moto_user, "permissions_boundary") and moto_user.permissions_boundary:
1✔
433
            response["User"]["PermissionsBoundary"] = AttachedPermissionsBoundary(
1✔
434
                PermissionsBoundaryArn=moto_user.permissions_boundary,
435
                PermissionsBoundaryType="Policy",
436
            )
437

438
        return response
1✔
439

440
    def delete_user(
1✔
441
        self, context: RequestContext, user_name: existingUserNameType, **kwargs
442
    ) -> None:
443
        moto_user = get_iam_backend(context).users.get(user_name)
1✔
444
        if moto_user and moto_user.service_specific_credentials:
1✔
445
            LOG.info(
1✔
446
                "Cannot delete user '%s' because service specific credentials are still present.",
447
                user_name,
448
            )
449
            raise DeleteConflictException(
1✔
450
                "Cannot delete entity, must remove referenced objects first."
451
            )
452
        return call_moto(context=context)
1✔
453

454
    def attach_role_policy(
1✔
455
        self, context: RequestContext, role_name: roleNameType, policy_arn: arnType, **kwargs
456
    ) -> None:
457
        if not POLICY_ARN_REGEX.match(policy_arn):
1✔
458
            raise ValidationError("Invalid ARN:  Could not be parsed!")
1✔
459
        return call_moto(context=context)
1✔
460

461
    def attach_user_policy(
1✔
462
        self, context: RequestContext, user_name: userNameType, policy_arn: arnType, **kwargs
463
    ) -> None:
464
        if not POLICY_ARN_REGEX.match(policy_arn):
1✔
465
            raise ValidationError("Invalid ARN:  Could not be parsed!")
1✔
466
        return call_moto(context=context)
1✔
467

468
    # ------------------------------ Service specific credentials ------------------------------ #
469

470
    def _get_user_or_raise_error(self, user_name: str, context: RequestContext) -> MotoUser:
1✔
471
        """
472
        Return the moto user from the store, or raise the proper exception if no user can be found.
473

474
        :param user_name: Username to find
475
        :param context: Request context
476
        :return: A moto user object
477
        """
478
        moto_user = get_iam_backend(context).users.get(user_name)
1✔
479
        if not moto_user:
1✔
480
            raise NoSuchEntityException(f"The user with name {user_name} cannot be found.")
1✔
481
        return moto_user
1✔
482

483
    def _validate_service_name(self, service_name: str) -> None:
1✔
484
        """
485
        Validate if the service provided is supported.
486

487
        :param service_name: Service name to check
488
        """
489
        if service_name not in ["codecommit.amazonaws.com", "cassandra.amazonaws.com"]:
1✔
490
            raise NoSuchEntityException(
1✔
491
                f"No such service {service_name} is supported for Service Specific Credentials"
492
            )
493

494
    def _validate_credential_id(self, credential_id: str) -> None:
1✔
495
        """
496
        Validate if the credential id is correctly formed.
497

498
        :param credential_id: Credential ID to check
499
        """
500
        if not CREDENTIAL_ID_REGEX.match(credential_id):
1✔
501
            raise ValidationListError(
1✔
502
                [
503
                    "Value at 'serviceSpecificCredentialId' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w]+"
504
                ]
505
            )
506

507
    def _generate_service_password(self):
1✔
508
        """
509
        Generate a new service password for a service specific credential.
510

511
        :return: 60 letter password ending in `=`
512
        """
513
        password_charset = string.ascii_letters + string.digits + "+/"
1✔
514
        # password always ends in = for some reason - but it is not base64
515
        return "".join(random.choices(password_charset, k=59)) + "="
1✔
516

517
    def _generate_credential_id(self, context: RequestContext):
1✔
518
        """
519
        Generate a credential ID.
520
        Credentials have a similar structure as access key ids, and also contain the account id encoded in them.
521
        Example: `ACCAQAAAAAAAPBAFQJI5W` for account `000000000000`
522

523
        :param context: Request context (to extract account id)
524
        :return: New credential id.
525
        """
526
        return generate_access_key_id_from_account_id(
1✔
527
            context.account_id, prefix="ACCA", total_length=21
528
        )
529

530
    def _new_service_specific_credential(
1✔
531
        self, user_name: str, service_name: str, context: RequestContext
532
    ) -> ServiceSpecificCredential:
533
        """
534
        Create a new service specific credential for the given username and service.
535

536
        :param user_name: Username the credential will be assigned to.
537
        :param service_name: Service the credential will be used for.
538
        :param context: Request context, used to extract the account id.
539
        :return: New ServiceSpecificCredential
540
        """
541
        password = self._generate_service_password()
1✔
542
        credential_id = self._generate_credential_id(context)
1✔
543
        return ServiceSpecificCredential(
1✔
544
            CreateDate=datetime.now(),
545
            ServiceName=service_name,
546
            ServiceUserName=f"{user_name}-at-{context.account_id}",
547
            ServicePassword=password,
548
            ServiceSpecificCredentialId=credential_id,
549
            UserName=user_name,
550
            Status=statusType.Active,
551
        )
552

553
    def _find_credential_in_user_by_id(
1✔
554
        self, user_name: str, credential_id: str, context: RequestContext
555
    ) -> ServiceSpecificCredential:
556
        """
557
        Find a credential by a given username and id.
558
        Raises errors if the user or credential is not found.
559

560
        :param user_name: Username of the user the credential is assigned to.
561
        :param credential_id: Credential ID to check
562
        :param context: Request context (used to determine account and region)
563
        :return: Service specific credential
564
        """
565
        moto_user = self._get_user_or_raise_error(user_name, context)
1✔
566
        self._validate_credential_id(credential_id)
1✔
567
        matching_credentials = [
1✔
568
            cred
569
            for cred in moto_user.service_specific_credentials
570
            if cred["ServiceSpecificCredentialId"] == credential_id
571
        ]
572
        if not matching_credentials:
1✔
573
            raise NoSuchEntityException(f"No such credential {credential_id} exists")
1✔
574
        return matching_credentials[0]
1✔
575

576
    def _validate_status(self, status: str):
1✔
577
        """
578
        Validate if the status has an accepted value.
579
        Raises a ValidationError if the status is invalid.
580

581
        :param status: Status to check
582
        """
583
        try:
1✔
584
            statusType(status)
1✔
585
        except ValueError:
1✔
586
            raise ValidationListError(
1✔
587
                [
588
                    "Value at 'status' failed to satisfy constraint: Member must satisfy enum value set"
589
                ]
590
            )
591

592
    def build_dict_with_only_defined_keys(
1✔
593
        self, data: dict[str, Any], typed_dict_type: type[T]
594
    ) -> T:
595
        """
596
        Builds a dict with only the defined keys from a given typed dict.
597
        Filtering is only present on the first level.
598

599
        :param data: Dict to filter.
600
        :param typed_dict_type: TypedDict subtype containing the attributes allowed to be present in the return value
601
        :return: shallow copy of the data only containing the keys defined on typed_dict_type
602
        """
603
        key_set = inspect.get_annotations(typed_dict_type).keys()
1✔
604
        return {k: v for k, v in data.items() if k in key_set}
1✔
605

606
    def create_service_specific_credential(
1✔
607
        self, context: RequestContext, user_name: userNameType, service_name: serviceName, **kwargs
608
    ) -> CreateServiceSpecificCredentialResponse:
609
        moto_user = self._get_user_or_raise_error(user_name, context)
1✔
610
        self._validate_service_name(service_name)
1✔
611
        credential = self._new_service_specific_credential(user_name, service_name, context)
1✔
612
        moto_user.service_specific_credentials.append(credential)
1✔
613
        return CreateServiceSpecificCredentialResponse(ServiceSpecificCredential=credential)
1✔
614

615
    def list_service_specific_credentials(
1✔
616
        self,
617
        context: RequestContext,
618
        user_name: userNameType = None,
619
        service_name: serviceName = None,
620
        **kwargs,
621
    ) -> ListServiceSpecificCredentialsResponse:
622
        moto_user = self._get_user_or_raise_error(user_name, context)
1✔
623
        self._validate_service_name(service_name)
1✔
624
        result = [
1✔
625
            self.build_dict_with_only_defined_keys(creds, ServiceSpecificCredentialMetadata)
626
            for creds in moto_user.service_specific_credentials
627
            if creds["ServiceName"] == service_name
628
        ]
629
        return ListServiceSpecificCredentialsResponse(ServiceSpecificCredentials=result)
1✔
630

631
    def update_service_specific_credential(
1✔
632
        self,
633
        context: RequestContext,
634
        service_specific_credential_id: serviceSpecificCredentialId,
635
        status: statusType,
636
        user_name: userNameType = None,
637
        **kwargs,
638
    ) -> None:
639
        self._validate_status(status)
1✔
640

641
        credential = self._find_credential_in_user_by_id(
1✔
642
            user_name, service_specific_credential_id, context
643
        )
644
        credential["Status"] = status
1✔
645

646
    def reset_service_specific_credential(
1✔
647
        self,
648
        context: RequestContext,
649
        service_specific_credential_id: serviceSpecificCredentialId,
650
        user_name: userNameType = None,
651
        **kwargs,
652
    ) -> ResetServiceSpecificCredentialResponse:
653
        credential = self._find_credential_in_user_by_id(
1✔
654
            user_name, service_specific_credential_id, context
655
        )
656
        credential["ServicePassword"] = self._generate_service_password()
1✔
657
        return ResetServiceSpecificCredentialResponse(ServiceSpecificCredential=credential)
1✔
658

659
    def delete_service_specific_credential(
1✔
660
        self,
661
        context: RequestContext,
662
        service_specific_credential_id: serviceSpecificCredentialId,
663
        user_name: userNameType = None,
664
        **kwargs,
665
    ) -> None:
666
        moto_user = self._get_user_or_raise_error(user_name, context)
1✔
667
        credentials = self._find_credential_in_user_by_id(
1✔
668
            user_name, service_specific_credential_id, context
669
        )
670
        try:
1✔
671
            moto_user.service_specific_credentials.remove(credentials)
1✔
672
        # just in case of race conditions
UNCOV
673
        except ValueError:
×
UNCOV
674
            raise NoSuchEntityException(
×
675
                f"No such credential {service_specific_credential_id} exists"
676
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc