• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19880423371

02 Dec 2025 08:25PM UTC coverage: 86.905% (-0.04%) from 86.945%
19880423371

push

github

web-flow
fix/external client CA bundle (#13451)

1 of 5 new or added lines in 1 file covered. (20.0%)

414 existing lines in 19 files now uncovered.

69738 of 80246 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/localstack-core/localstack/services/sts/provider.py
1
import logging
1✔
2
import re
1✔
3

4
from localstack.aws.api import CommonServiceException, RequestContext, ServiceException
1✔
5
from localstack.aws.api.sts import (
1✔
6
    AssumeRoleResponse,
7
    GetCallerIdentityResponse,
8
    ProvidedContextsListType,
9
    StsApi,
10
    arnType,
11
    externalIdType,
12
    policyDescriptorListType,
13
    roleDurationSecondsType,
14
    roleSessionNameType,
15
    serialNumberType,
16
    sourceIdentityType,
17
    tagKeyListType,
18
    tagListType,
19
    tokenCodeType,
20
    unrestrictedSessionPolicyDocumentType,
21
)
22
from localstack.services.iam.iam_patches import apply_iam_patches
1✔
23
from localstack.services.moto import call_moto
1✔
24
from localstack.services.plugins import ServiceLifecycleHook
1✔
25
from localstack.services.sts.models import SessionConfig, sts_stores
1✔
26
from localstack.state import StateVisitor
1✔
27
from localstack.utils.aws.arns import extract_account_id_from_arn
1✔
28
from localstack.utils.aws.request_context import extract_access_key_id_from_auth_header
1✔
29

30
LOG = logging.getLogger(__name__)
1✔
31

32

33
class InvalidParameterValueError(ServiceException):
1✔
34
    code = "InvalidParameterValue"
1✔
35
    status_code = 400
1✔
36
    sender_fault = True
1✔
37

38

39
# allows for arn:a:a:::aaaaaaaaaa which would pass the check
40
ROLE_ARN_REGEX = re.compile(r"^arn:[^:]+:[^:]+:[^:]*:[^:]*:[^:]+$")
1✔
41
# Session name regex as specified in the error response from AWS
42
SESSION_NAME_REGEX = re.compile(r"^[\w+=,.@-]*$")
1✔
43

44

45
class ValidationError(CommonServiceException):
1✔
46
    def __init__(self, message: str):
1✔
47
        super().__init__("ValidationError", message, 400, True)
1✔
48

49

50
class StsProvider(StsApi, ServiceLifecycleHook):
1✔
51
    def __init__(self):
1✔
52
        apply_iam_patches()
1✔
53

54
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
UNCOV
55
        from moto.sts.models import sts_backends
×
56

UNCOV
57
        visitor.visit(sts_backends)
×
UNCOV
58
        visitor.visit(sts_stores)
×
59

60
    def get_caller_identity(self, context: RequestContext, **kwargs) -> GetCallerIdentityResponse:
1✔
61
        response = call_moto(context)
1✔
62
        if "user/moto" in response["Arn"] and "sts" in response["Arn"]:
1✔
63
            response["Arn"] = f"arn:{context.partition}:iam::{response['Account']}:root"
1✔
64
        return response
1✔
65

66
    def assume_role(
1✔
67
        self,
68
        context: RequestContext,
69
        role_arn: arnType,
70
        role_session_name: roleSessionNameType,
71
        policy_arns: policyDescriptorListType = None,
72
        policy: unrestrictedSessionPolicyDocumentType = None,
73
        duration_seconds: roleDurationSecondsType = None,
74
        tags: tagListType = None,
75
        transitive_tag_keys: tagKeyListType = None,
76
        external_id: externalIdType = None,
77
        serial_number: serialNumberType = None,
78
        token_code: tokenCodeType = None,
79
        source_identity: sourceIdentityType = None,
80
        provided_contexts: ProvidedContextsListType = None,
81
        **kwargs,
82
    ) -> AssumeRoleResponse:
83
        # verify role arn
84
        if not ROLE_ARN_REGEX.match(role_arn):
1✔
85
            raise ValidationError(f"{role_arn} is invalid")
1✔
86

87
        if not SESSION_NAME_REGEX.match(role_session_name):
1✔
88
            raise ValidationError(
1✔
89
                f"1 validation error detected: Value '{role_session_name}' at 'roleSessionName' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w+=,.@-]*"
90
            )
91

92
        target_account_id = extract_account_id_from_arn(role_arn) or context.account_id
1✔
93
        access_key_id = extract_access_key_id_from_auth_header(context.request.headers)
1✔
94
        store = sts_stores[target_account_id]["us-east-1"]
1✔
95
        existing_session_config = store.sessions.get(access_key_id, {})
1✔
96

97
        if tags:
1✔
98
            tag_keys = {tag["Key"].lower() for tag in tags}
1✔
99
            # if the lower-cased set is smaller than the number of keys, there have to be some duplicates.
100
            if len(tag_keys) < len(tags):
1✔
101
                raise InvalidParameterValueError(
1✔
102
                    "Duplicate tag keys found. Please note that Tag keys are case insensitive."
103
                )
104

105
            # prevent transitive tags from being overridden
106
            if existing_session_config:
1✔
107
                if set(existing_session_config["transitive_tags"]).intersection(tag_keys):
1✔
108
                    raise InvalidParameterValueError(
1✔
109
                        "One of the specified transitive tag keys can't be set because it conflicts with a transitive tag key from the calling session."
110
                    )
111
            if transitive_tag_keys:
1✔
112
                transitive_tag_key_set = {key.lower() for key in transitive_tag_keys}
1✔
113
                if not transitive_tag_key_set <= tag_keys:
1✔
114
                    raise InvalidParameterValueError(
1✔
115
                        "The specified transitive tag key must be included in the requested tags."
116
                    )
117

118
        response: AssumeRoleResponse = call_moto(context)
1✔
119

120
        transitive_tag_keys = transitive_tag_keys or []
1✔
121
        tags = tags or []
1✔
122
        transformed_tags = {tag["Key"].lower(): tag for tag in tags}
1✔
123
        # propagate transitive tags
124
        if existing_session_config:
1✔
UNCOV
125
            for tag in existing_session_config["transitive_tags"]:
×
UNCOV
126
                transformed_tags[tag] = existing_session_config["tags"][tag]
×
UNCOV
127
            transitive_tag_keys += existing_session_config["transitive_tags"]
×
128
        if transformed_tags:
1✔
129
            # store session tagging config
130
            access_key_id = response["Credentials"]["AccessKeyId"]
1✔
131
            store.sessions[access_key_id] = SessionConfig(
1✔
132
                tags=transformed_tags,
133
                transitive_tags=[key.lower() for key in transitive_tag_keys],
134
                iam_context={},
135
            )
136
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc