• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-python / 391205

01 Jul 2024 10:53AM UTC coverage: 66.843% (-0.09%) from 66.932%
391205

push

python-ci

web-flow
UserAssigned MSI support (#2129)

* Added ManagedIdentity

* Missing ConfigurationServiceClientCredentialFactory awaits

* ManagedIdentityAppCredentials needs ManagedIdentity dict

* Added missing PermissionError descriptions

* Black reformatting in botbuilder-core

---------

Co-authored-by: Tracy Boehrer <trboehre@microsoft.com>

31 of 77 new or added lines in 8 files covered. (40.26%)

1 existing line in 1 file now uncovered.

9112 of 13632 relevant lines covered (66.84%)

2.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.38
/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/cloud_adapter.py
1
# Copyright (c) Microsoft Corporation. All rights reserved.
2
# Licensed under the MIT License.
3

4
from typing import Awaitable, Callable, Optional
4✔
5

6
from aiohttp.web import (
4✔
7
    Request,
8
    Response,
9
    json_response,
10
    WebSocketResponse,
11
    HTTPBadRequest,
12
    HTTPMethodNotAllowed,
13
    HTTPUnauthorized,
14
    HTTPUnsupportedMediaType,
15
)
16
from botbuilder.core import (
4✔
17
    Bot,
18
    CloudAdapterBase,
19
    InvokeResponse,
20
    TurnContext,
21
)
22
from botbuilder.core.streaming import (
4✔
23
    StreamingActivityProcessor,
24
    StreamingHttpDriver,
25
    StreamingRequestHandler,
26
)
27
from botbuilder.schema import Activity
4✔
28
from botbuilder.integration.aiohttp.streaming import AiohttpWebSocket
4✔
29
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
4✔
30
from botframework.connector.aio import ConnectorClient
4✔
31
from botframework.connector.auth import (
4✔
32
    AuthenticateRequestResult,
33
    BotFrameworkAuthentication,
34
    BotFrameworkAuthenticationFactory,
35
    ConnectorFactory,
36
    MicrosoftAppCredentials,
37
)
38

39
from .bot_framework_http_adapter_integration_base import (
4✔
40
    BotFrameworkHttpAdapterIntegrationBase,
41
)
42

43

44
class CloudAdapter(CloudAdapterBase, BotFrameworkHttpAdapterIntegrationBase):
4✔
45
    def __init__(self, bot_framework_authentication: BotFrameworkAuthentication = None):
4✔
46
        """
47
        Initializes a new instance of the CloudAdapter class.
48

49
        :param bot_framework_authentication: Optional BotFrameworkAuthentication instance
50
        """
51
        # pylint: disable=invalid-name
52
        if not bot_framework_authentication:
×
53
            bot_framework_authentication = BotFrameworkAuthenticationFactory.create()
×
54

55
        self._AUTH_HEADER_NAME = "authorization"
×
56
        self._CHANNEL_ID_HEADER_NAME = "channelid"
×
57
        super().__init__(bot_framework_authentication)
×
58

59
    async def process(
4✔
60
        self, request: Request, bot: Bot, ws_response: WebSocketResponse = None
61
    ) -> Optional[Response]:
62
        if not request:
×
63
            raise TypeError("request can't be None")
×
64
        # if ws_response is None:
65
        # raise TypeError("ws_response can't be None")
66
        if not bot:
×
67
            raise TypeError("bot can't be None")
×
68
        try:
×
69
            # Only GET requests for web socket connects are allowed
70
            if (
×
71
                request.method == "GET"
72
                and ws_response
73
                and ws_response.can_prepare(request)
74
            ):
75
                # All socket communication will be handled by the internal streaming-specific BotAdapter
76
                await self._connect(bot, request, ws_response)
×
77
            elif request.method == "POST":
×
78
                # Deserialize the incoming Activity
79
                if "application/json" in request.headers["Content-Type"]:
×
80
                    body = await request.json()
×
81
                else:
82
                    raise HTTPUnsupportedMediaType()
×
83

84
                activity: Activity = Activity().deserialize(body)
×
85

86
                # A POST request must contain an Activity
87
                if not activity.type:
×
88
                    raise HTTPBadRequest
×
89

90
                # Grab the auth header from the inbound http request
91
                auth_header = (
×
92
                    request.headers["Authorization"]
93
                    if "Authorization" in request.headers
94
                    else ""
95
                )
96

97
                # Process the inbound activity with the bot
98
                invoke_response = await self.process_activity(
×
99
                    auth_header, activity, bot.on_turn
100
                )
101

102
                # Write the response, serializing the InvokeResponse
103
                if invoke_response:
×
104
                    return json_response(
×
105
                        data=invoke_response.body, status=invoke_response.status
106
                    )
107
                return Response(status=201)
×
108
            else:
109
                raise HTTPMethodNotAllowed
×
NEW
110
        except PermissionError:
×
111
            raise HTTPUnauthorized
×
112

113
    async def _connect(
4✔
114
        self, bot: Bot, request: Request, ws_response: WebSocketResponse
115
    ):
116
        if ws_response is None:
×
117
            raise TypeError("ws_response can't be None")
×
118

119
        # Grab the auth header from the inbound http request
120
        auth_header = request.headers.get(self._AUTH_HEADER_NAME)
×
121
        # Grab the channelId which should be in the http headers
122
        channel_id = request.headers.get(self._CHANNEL_ID_HEADER_NAME)
×
123

124
        authentication_request_result = (
×
125
            await self.bot_framework_authentication.authenticate_streaming_request(
126
                auth_header, channel_id
127
            )
128
        )
129

130
        # Transition the request to a WebSocket connection
131
        await ws_response.prepare(request)
×
132
        bf_web_socket = AiohttpWebSocket(ws_response)
×
133

134
        streaming_activity_processor = _StreamingActivityProcessor(
×
135
            authentication_request_result, self, bot, bf_web_socket
136
        )
137

138
        await streaming_activity_processor.listen()
×
139

140

141
class _StreamingActivityProcessor(StreamingActivityProcessor):
4✔
142
    def __init__(
4✔
143
        self,
144
        authenticate_request_result: AuthenticateRequestResult,
145
        adapter: CloudAdapter,
146
        bot: Bot,
147
        web_socket: AiohttpWebSocket = None,
148
    ) -> None:
149
        self._authenticate_request_result = authenticate_request_result
×
150
        self._adapter = adapter
×
151

152
        # Internal reuse of the existing StreamingRequestHandler class
153
        self._request_handler = StreamingRequestHandler(bot, self, web_socket)
×
154

155
        # Fix up the connector factory so connector create from it will send over this connection
156
        self._authenticate_request_result.connector_factory = (
×
157
            _StreamingConnectorFactory(self._request_handler)
158
        )
159

160
    async def listen(self):
4✔
161
        await self._request_handler.listen()
×
162

163
    async def process_streaming_activity(
4✔
164
        self,
165
        activity: Activity,
166
        bot_callback_handler: Callable[[TurnContext], Awaitable],
167
    ) -> InvokeResponse:
168
        return await self._adapter.process_activity(
×
169
            self._authenticate_request_result, activity, bot_callback_handler
170
        )
171

172

173
class _StreamingConnectorFactory(ConnectorFactory):
4✔
174
    def __init__(self, request_handler: StreamingRequestHandler) -> None:
4✔
175
        self._request_handler = request_handler
×
176
        self._service_url = None
×
177

178
    async def create(
4✔
179
        self, service_url: str, audience: str  # pylint: disable=unused-argument
180
    ) -> ConnectorClient:
181
        if not self._service_url:
×
182
            self._service_url = service_url
×
183
        elif service_url != self._service_url:
×
184
            raise RuntimeError(
×
185
                "This is a streaming scenario, all connectors from this factory must all be for the same url."
186
            )
187

188
        # TODO: investigate if Driver and pipeline should be moved here
189
        streaming_driver = StreamingHttpDriver(self._request_handler)
×
190
        config = BotFrameworkConnectorConfiguration(
×
191
            MicrosoftAppCredentials.empty(),
192
            service_url,
193
            pipeline_type=AsyncBfPipeline,
194
            driver=streaming_driver,
195
        )
196
        streaming_driver.config = config
×
197
        connector_client = ConnectorClient(None, custom_configuration=config)
×
198

199
        return connector_client
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc