• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-python / 391205

01 Jul 2024 10:53AM UTC coverage: 66.843% (-0.09%) from 66.932%
391205

push

python-ci

web-flow
UserAssigned MSI support (#2129)

* Added ManagedIdentity

* Missing ConfigurationServiceClientCredentialFactory awaits

* ManagedIdentityAppCredentials needs ManagedIdentity dict

* Added missing PermissionError descriptions

* Black reformatting in botbuilder-core

---------

Co-authored-by: Tracy Boehrer <trboehre@microsoft.com>

31 of 77 new or added lines in 8 files covered. (40.26%)

1 existing line in 1 file now uncovered.

9112 of 13632 relevant lines covered (66.84%)

2.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.63
/libraries/botframework-connector/botframework/connector/auth/jwt_token_validation.py
1
# Copyright (c) Microsoft Corporation. All rights reserved.
2
# Licensed under the MIT License.
3

4
from typing import Dict, List, Union
4✔
5

6
from botbuilder.schema import Activity, RoleTypes
4✔
7

8
from ..channels import Channels
4✔
9
from .authentication_configuration import AuthenticationConfiguration
4✔
10
from .authentication_constants import AuthenticationConstants
4✔
11
from .emulator_validation import EmulatorValidation
4✔
12
from .enterprise_channel_validation import EnterpriseChannelValidation
4✔
13
from .channel_validation import ChannelValidation
4✔
14
from .credential_provider import CredentialProvider
4✔
15
from .claims_identity import ClaimsIdentity
4✔
16
from .government_constants import GovernmentConstants
4✔
17
from .government_channel_validation import GovernmentChannelValidation
4✔
18
from .skill_validation import SkillValidation
4✔
19
from .channel_provider import ChannelProvider
4✔
20

21

22
class JwtTokenValidation:
4✔
23
    # TODO remove the default value on channel_service
24
    @staticmethod
4✔
25
    async def authenticate_request(
4✔
26
        activity: Activity,
27
        auth_header: str,
28
        credentials: CredentialProvider,
29
        channel_service_or_provider: Union[str, ChannelProvider] = "",
30
        auth_configuration: AuthenticationConfiguration = None,
31
    ) -> ClaimsIdentity:
32
        """Authenticates the request and sets the service url in the set of trusted urls.
33
        :param activity: The incoming Activity from the Bot Framework or the Emulator
34
        :type activity: ~botframework.connector.models.Activity
35
        :param auth_header: The Bearer token included as part of the request
36
        :type auth_header: str
37
        :param credentials: The set of valid credentials, such as the Bot Application ID
38
        :param channel_service_or_provider: String for the channel service
39
        :param auth_configuration: Authentication configuration
40
        :type credentials: CredentialProvider
41

42
        :raises Exception:
43
        """
44
        if not auth_header:
4✔
45
            # No auth header was sent. We might be on the anonymous code path.
46
            auth_is_disabled = await credentials.is_authentication_disabled()
4✔
47
            if not auth_is_disabled:
4✔
48
                # No Auth Header. Auth is required. Request is not authorized.
NEW
49
                raise PermissionError("Required Authorization token was not supplied")
×
50

51
            # Check if the activity is for a skill call and is coming from the Emulator.
52
            try:
4✔
53
                if (
4✔
54
                    activity.channel_id == Channels.emulator
55
                    and activity.recipient.role == RoleTypes.skill
56
                    and activity.relates_to is not None
57
                ):
58
                    # Return an anonymous claim with an anonymous skill AppId
59
                    return SkillValidation.create_anonymous_skill_claim()
4✔
60
            except AttributeError:
×
61
                pass
×
62

63
            # In the scenario where Auth is disabled, we still want to have the
64
            # IsAuthenticated flag set in the ClaimsIdentity. To do this requires
65
            # adding in an empty claim.
66
            return ClaimsIdentity({}, True, AuthenticationConstants.ANONYMOUS_AUTH_TYPE)
4✔
67

68
        # Validate the header and extract claims.
69
        claims_identity = await JwtTokenValidation.validate_auth_header(
×
70
            auth_header,
71
            credentials,
72
            channel_service_or_provider,
73
            activity.channel_id,
74
            activity.service_url,
75
            auth_configuration,
76
        )
77

78
        return claims_identity
×
79

80
    @staticmethod
4✔
81
    async def validate_auth_header(
4✔
82
        auth_header: str,
83
        credentials: CredentialProvider,
84
        channel_service_or_provider: Union[str, ChannelProvider],
85
        channel_id: str,
86
        service_url: str = None,
87
        auth_configuration: AuthenticationConfiguration = None,
88
    ) -> ClaimsIdentity:
89
        if not auth_header:
4✔
90
            raise ValueError("argument auth_header is null")
4✔
91

92
        async def get_claims() -> ClaimsIdentity:
×
93
            if SkillValidation.is_skill_token(auth_header):
×
94
                return await SkillValidation.authenticate_channel_token(
×
95
                    auth_header,
96
                    credentials,
97
                    channel_service_or_provider,
98
                    channel_id,
99
                    auth_configuration,
100
                )
101

102
            if EmulatorValidation.is_token_from_emulator(auth_header):
×
103
                return await EmulatorValidation.authenticate_emulator_token(
×
104
                    auth_header, credentials, channel_service_or_provider, channel_id
105
                )
106

107
            is_public = (
×
108
                not channel_service_or_provider
109
                or isinstance(channel_service_or_provider, ChannelProvider)
110
                and channel_service_or_provider.is_public_azure()
111
            )
112
            is_gov = (
×
113
                isinstance(channel_service_or_provider, ChannelProvider)
114
                and channel_service_or_provider.is_government()
115
                or isinstance(channel_service_or_provider, str)
116
                and JwtTokenValidation.is_government(channel_service_or_provider)
117
            )
118

119
            # If the channel is Public Azure
120
            if is_public:
×
121
                if service_url:
×
122
                    return await ChannelValidation.authenticate_channel_token_with_service_url(
×
123
                        auth_header,
124
                        credentials,
125
                        service_url,
126
                        channel_id,
127
                        auth_configuration,
128
                    )
129

130
                return await ChannelValidation.authenticate_channel_token(
×
131
                    auth_header, credentials, channel_id, auth_configuration
132
                )
133

134
            if is_gov:
×
135
                if service_url:
×
136
                    return await GovernmentChannelValidation.authenticate_channel_token_with_service_url(
×
137
                        auth_header,
138
                        credentials,
139
                        service_url,
140
                        channel_id,
141
                        auth_configuration,
142
                    )
143

144
                return await GovernmentChannelValidation.authenticate_channel_token(
×
145
                    auth_header, credentials, channel_id, auth_configuration
146
                )
147

148
            # Otherwise use Enterprise Channel Validation
149
            if service_url:
×
150
                return await EnterpriseChannelValidation.authenticate_channel_token_with_service_url(
×
151
                    auth_header,
152
                    credentials,
153
                    service_url,
154
                    channel_id,
155
                    channel_service_or_provider,
156
                    auth_configuration,
157
                )
158

159
            return await EnterpriseChannelValidation.authenticate_channel_token(
×
160
                auth_header,
161
                credentials,
162
                channel_id,
163
                channel_service_or_provider,
164
                auth_configuration,
165
            )
166

167
        claims = await get_claims()
×
168

169
        if claims:
×
170
            await JwtTokenValidation.validate_claims(auth_configuration, claims.claims)
×
171

172
        return claims
×
173

174
    @staticmethod
4✔
175
    async def validate_claims(
4✔
176
        auth_config: AuthenticationConfiguration, claims: List[Dict]
177
    ):
178
        if auth_config and auth_config.claims_validator:
4✔
179
            await auth_config.claims_validator(claims)
4✔
180
        elif SkillValidation.is_skill_claim(claims):
4✔
181
            # Skill claims must be validated using AuthenticationConfiguration claims_validator
182
            raise PermissionError(
×
183
                "Unauthorized Access. Request is not authorized. Skill Claims require validation."
184
            )
185

186
    @staticmethod
4✔
187
    def is_government(channel_service: str) -> bool:
4✔
188
        return (
×
189
            channel_service
190
            and channel_service.lower() == GovernmentConstants.CHANNEL_SERVICE
191
        )
192

193
    @staticmethod
4✔
194
    def get_app_id_from_claims(claims: Dict[str, object]) -> str:
4✔
195
        app_id = None
4✔
196

197
        # Depending on Version, the is either in the
198
        # appid claim (Version 1) or the Authorized Party claim (Version 2).
199
        token_version = claims.get(AuthenticationConstants.VERSION_CLAIM)
4✔
200

201
        if not token_version or token_version == "1.0":
4✔
202
            # either no Version or a version of "1.0" means we should look for
203
            # the claim in the "appid" claim.
204
            app_id = claims.get(AuthenticationConstants.APP_ID_CLAIM)
4✔
205
        elif token_version == "2.0":
4✔
206
            app_id = claims.get(AuthenticationConstants.AUTHORIZED_PARTY)
4✔
207

208
        return app_id
4✔
209

210
    @staticmethod
4✔
211
    def is_valid_token_format(auth_header: str) -> bool:
4✔
212
        if not auth_header:
4✔
213
            # No token. Can't be an emulator token.
214
            return False
4✔
215

216
        parts = auth_header.split(" ")
4✔
217
        if len(parts) != 2:
4✔
218
            # Emulator tokens MUST have exactly 2 parts.
219
            # If we don't have 2 parts, it's not an emulator token
220
            return False
4✔
221

222
        auth_scheme = parts[0]
4✔
223

224
        # The scheme MUST be "Bearer"
225
        return auth_scheme == "Bearer"
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc