• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-python / 393243

07 Aug 2024 01:57PM UTC coverage: 66.988% (+0.003%) from 66.985%
393243

push

python-ci

web-flow
Bump PyYAML 6.0 to 6.0.1 (#2151)

9180 of 13704 relevant lines covered (66.99%)

2.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.41
/libraries/botbuilder-core/botbuilder/core/streaming/streaming_request_handler.py
1
# Copyright (c) Microsoft Corporation. All rights reserved.
2
# Licensed under the MIT License.
3

4
import platform
4✔
5
import traceback
4✔
6
from http import HTTPStatus
4✔
7
from datetime import datetime
4✔
8
from logging import Logger
4✔
9
from json import loads
4✔
10
from typing import Dict, List
4✔
11

12
from botbuilder.core import Bot
4✔
13
from botbuilder.schema import Activity, Attachment, ResourceResponse
4✔
14
from botframework.streaming import (
4✔
15
    RequestHandler,
16
    ReceiveRequest,
17
    ReceiveResponse,
18
    StreamingRequest,
19
    StreamingResponse,
20
    __title__,
21
    __version__,
22
)
23
from botframework.streaming.transport import DisconnectedEventArgs
4✔
24
from botframework.streaming.transport.web_socket import WebSocket, WebSocketServer
4✔
25

26
from .streaming_activity_processor import StreamingActivityProcessor
4✔
27
from .version_info import VersionInfo
4✔
28

29

30
class StreamContent:
4✔
31
    def __init__(self, stream: List[int], *, headers: Dict[str, str] = None):
4✔
32
        self.stream = stream
×
33
        self.headers: Dict[str, str] = headers if headers is not None else {}
×
34

35

36
class StreamingRequestHandler(RequestHandler):
4✔
37
    def __init__(
4✔
38
        self,
39
        bot: Bot,
40
        activity_processor: StreamingActivityProcessor,
41
        web_socket: WebSocket,
42
        logger: Logger = None,
43
    ):
44
        if not bot:
4✔
45
            raise TypeError(f"'bot: {bot.__class__.__name__}' argument can't be None")
×
46
        if not activity_processor:
4✔
47
            raise TypeError(
×
48
                f"'activity_processor: {activity_processor.__class__.__name__}' argument can't be None"
49
            )
50

51
        self._bot = bot
4✔
52
        self._activity_processor = activity_processor
4✔
53
        self._logger = logger
4✔
54
        self._conversations: Dict[str, datetime] = {}
4✔
55
        self._user_agent = StreamingRequestHandler._get_user_agent()
4✔
56
        self._server = WebSocketServer(web_socket, self)
4✔
57
        self._server_is_connected = True
4✔
58
        self._server.disconnected_event_handler = self._server_disconnected
4✔
59
        self._service_url: str = None
4✔
60

61
    @property
4✔
62
    def service_url(self) -> str:
4✔
63
        return self._service_url
×
64

65
    async def listen(self):
4✔
66
        await self._server.start()
4✔
67

68
        # TODO: log it
69

70
    def has_conversation(self, conversation_id: str) -> bool:
4✔
71
        return conversation_id in self._conversations
×
72

73
    def conversation_added_time(self, conversation_id: str) -> datetime:
4✔
74
        added_time = self._conversations.get(conversation_id)
×
75

76
        if not added_time:
×
77
            added_time = datetime.min
×
78

79
        return added_time
×
80

81
    def forget_conversation(self, conversation_id: str):
4✔
82
        del self._conversations[conversation_id]
×
83

84
    async def process_request(
4✔
85
        self,
86
        request: ReceiveRequest,
87
        logger: Logger,  # pylint: disable=unused-argument
88
        context: object,  # pylint: disable=unused-argument
89
    ) -> StreamingResponse:
90
        # pylint: disable=pointless-string-statement
91
        response = StreamingResponse()
×
92

93
        # We accept all POSTs regardless of path, but anything else requires special treatment.
94
        if not request.verb == StreamingRequest.POST:
×
95
            return self._handle_custom_paths(request, response)
×
96

97
        # Convert the StreamingRequest into an activity the adapter can understand.
98
        try:
×
99
            body_str = await request.read_body_as_str()
×
100
        except Exception as error:
×
101
            traceback.print_exc()
×
102
            response.status_code = int(HTTPStatus.BAD_REQUEST)
×
103
            # TODO: log error
104

105
            return response
×
106

107
        try:
×
108
            # TODO: validate if should use deserialize or from_dict
109
            body_dict = loads(body_str)
×
110
            activity: Activity = Activity.deserialize(body_dict)
×
111

112
            # All activities received by this StreamingRequestHandler will originate from the same channel, but we won't
113
            # know what that channel is until we've received the first request.
114
            if not self.service_url:
×
115
                self._service_url = activity.service_url
×
116

117
            # If this is the first time the handler has seen this conversation it needs to be added to the dictionary so
118
            # the adapter is able to route requests to the correct handler.
119
            if not self.has_conversation(activity.conversation.id):
×
120
                self._conversations[activity.conversation.id] = datetime.now()
×
121

122
            """
123
            Any content sent as part of a StreamingRequest, including the request body
124
            and inline attachments, appear as streams added to the same collection. The first
125
            stream of any request will be the body, which is parsed and passed into this method
126
            as the first argument, 'body'. Any additional streams are inline attachments that need
127
            to be iterated over and added to the Activity as attachments to be sent to the Bot.
128
            """
129

130
            if len(request.streams) > 1:
×
131
                stream_attachments = [
×
132
                    Attachment(content_type=stream.content_type, content=stream.stream)
133
                    for stream in request.streams
134
                ]
135

136
                if activity.attachments:
×
137
                    activity.attachments += stream_attachments
×
138
                else:
139
                    activity.attachments = stream_attachments
×
140

141
            # Now that the request has been converted into an activity we can send it to the adapter.
142
            adapter_response = (
×
143
                await self._activity_processor.process_streaming_activity(
144
                    activity, self._bot.on_turn
145
                )
146
            )
147

148
            # Now we convert the invokeResponse returned by the adapter into a StreamingResponse we can send back
149
            # to the channel.
150
            if not adapter_response:
×
151
                response.status_code = int(HTTPStatus.OK)
×
152
            else:
153
                response.status_code = adapter_response.status
×
154
                if adapter_response.body:
×
155
                    response.set_body(adapter_response.body)
×
156

157
        except Exception as error:
×
158
            traceback.print_exc()
×
159
            response.status_code = int(HTTPStatus.INTERNAL_SERVER_ERROR)
×
160
            response.set_body(str(error))
×
161
            # TODO: log error
162

163
        return response
×
164

165
    async def send_activity(self, activity: Activity) -> ResourceResponse:
4✔
166
        if activity.reply_to_id:
×
167
            request_path = (
×
168
                f"/v3/conversations/{activity.conversation.id if activity.conversation else ''}/"
169
                f"activities/{activity. reply_to_id}"
170
            )
171
        else:
172
            request_path = f"/v3/conversations/{activity.conversation.id if activity.conversation else ''}/activities"
×
173

174
        stream_attachments = self._update_attachment_streams(activity)
×
175
        request = StreamingRequest.create_post(request_path)
×
176
        request.set_body(activity)
×
177
        if stream_attachments:
×
178
            for attachment in stream_attachments:
×
179
                # TODO: might be necessary to serialize this before adding
180
                request.add_stream(attachment)
×
181

182
        try:
×
183
            if not self._server_is_connected:
×
184
                raise Exception(
×
185
                    "Error while attempting to send: Streaming transport is disconnected."
186
                )
187

188
            server_response = await self._server.send(request)
×
189

190
            if server_response.status_code == HTTPStatus.OK:
×
191
                return server_response.read_body_as_json(ResourceResponse)
×
192
        except Exception:
×
193
            # TODO: log error
194
            traceback.print_exc()
×
195

196
        return None
×
197

198
    async def send_streaming_request(
4✔
199
        self, request: StreamingRequest
200
    ) -> ReceiveResponse:
201
        try:
×
202
            if not self._server_is_connected:
×
203
                raise Exception(
×
204
                    "Error while attempting to send: Streaming transport is disconnected."
205
                )
206

207
            return await self._server.send(request)
×
208
        except Exception:
×
209
            # TODO: remove printing and log it
210
            traceback.print_exc()
×
211

212
        return None
×
213

214
    @staticmethod
4✔
215
    def _get_user_agent() -> str:
4✔
216
        package_user_agent = f"{__title__}/{__version__}"
4✔
217
        uname = platform.uname()
4✔
218
        os_version = f"{uname.machine}-{uname.system}-{uname.version}"
4✔
219
        py_version = f"Python,Version={platform.python_version()}"
4✔
220
        platform_user_agent = f"({os_version}; {py_version})"
4✔
221
        user_agent = f"{package_user_agent} {platform_user_agent}"
4✔
222
        return user_agent
4✔
223

224
    def _update_attachment_streams(self, activity: Activity) -> List[object]:
4✔
225
        if not activity or not activity.attachments:
×
226
            return None
×
227

228
        def validate_int_list(obj: object) -> bool:
×
229
            if not isinstance(obj, list):
×
230
                return False
×
231

232
            return all(isinstance(element, int) for element in obj)
×
233

234
        stream_attachments = [
×
235
            attachment
236
            for attachment in activity.attachments
237
            if validate_int_list(attachment.content)
238
        ]
239

240
        if stream_attachments:
×
241
            activity.attachments = [
×
242
                attachment
243
                for attachment in activity.attachments
244
                if not validate_int_list(attachment.content)
245
            ]
246

247
            # TODO: validate StreamContent parallel
248
            return [
×
249
                StreamContent(
250
                    attachment.content,
251
                    headers={"Content-Type": attachment.content_type},
252
                )
253
                for attachment in stream_attachments
254
            ]
255

256
        return None
×
257

258
    def _server_disconnected(
4✔
259
        self,
260
        sender: object,  # pylint: disable=unused-argument
261
        event: DisconnectedEventArgs,  # pylint: disable=unused-argument
262
    ):
263
        self._server_is_connected = False
4✔
264

265
    def _handle_custom_paths(
4✔
266
        self, request: ReceiveRequest, response: StreamingResponse
267
    ) -> StreamingResponse:
268
        if not request or not request.verb or not request.path:
×
269
            response.status_code = int(HTTPStatus.BAD_REQUEST)
×
270
            # TODO: log error
271

272
            return response
×
273

274
        if request.verb == StreamingRequest.GET and request.path == "/api/version":
×
275
            response.status_code = int(HTTPStatus.OK)
×
276
            response.set_body(VersionInfo(user_agent=self._user_agent))
×
277

278
            return response
×
279

280
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc