• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-python / 393243

07 Aug 2024 01:57PM UTC coverage: 66.988% (+0.003%) from 66.985%
393243

push

python-ci

web-flow
Bump PyYAML 6.0 to 6.0.1 (#2151)

9180 of 13704 relevant lines covered (66.99%)

2.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.0
/libraries/botbuilder-core/botbuilder/core/streaming/bot_framework_http_adapter_base.py
1
# Copyright (c) Microsoft Corporation. All rights reserved.
2
# Licensed under the MIT License.
3

4
from http import HTTPStatus
4✔
5
from typing import Awaitable, Callable, List
4✔
6

7
from botbuilder.core import (
4✔
8
    Bot,
9
    BotFrameworkAdapter,
10
    BotFrameworkAdapterSettings,
11
    InvokeResponse,
12
    TurnContext,
13
)
14
from botbuilder.schema import Activity, ActivityTypes, ResourceResponse
4✔
15
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
4✔
16
from botframework.connector.aio import ConnectorClient
4✔
17
from botframework.connector.auth import (
4✔
18
    ClaimsIdentity,
19
    MicrosoftAppCredentials,
20
    MicrosoftGovernmentAppCredentials,
21
)
22

23
from .streaming_activity_processor import StreamingActivityProcessor
4✔
24
from .streaming_request_handler import StreamingRequestHandler
4✔
25
from .streaming_http_client import StreamingHttpDriver
4✔
26

27

28
class BotFrameworkHttpAdapterBase(BotFrameworkAdapter, StreamingActivityProcessor):
4✔
29
    # pylint: disable=pointless-string-statement
30
    def __init__(self, settings: BotFrameworkAdapterSettings):
4✔
31
        super().__init__(settings)
×
32

33
        self.connected_bot: Bot = None
×
34
        self.claims_identity: ClaimsIdentity = None
×
35
        self.request_handlers: List[StreamingRequestHandler] = None
×
36

37
    async def process_streaming_activity(
4✔
38
        self,
39
        activity: Activity,
40
        bot_callback_handler: Callable[[TurnContext], Awaitable],
41
    ) -> InvokeResponse:
42
        if not activity:
×
43
            raise TypeError(
×
44
                f"'activity: {activity.__class__.__name__}' argument can't be None"
45
            )
46

47
        """
48
        If a conversation has moved from one connection to another for the same Channel or Skill and
49
        hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler
50
        the conversation has been associated with should always be the active connection.
51
        """
52
        request_handler = [
×
53
            handler
54
            for handler in self.request_handlers
55
            if handler.service_url == activity.service_url
56
            and handler.has_conversation(activity.conversation.id)
57
        ]
58
        request_handler = request_handler[-1] if request_handler else None
×
59
        context = TurnContext(self, activity)
×
60

61
        if self.claims_identity:
×
62
            context.turn_state[self.BOT_IDENTITY_KEY] = self.claims_identity
×
63

64
        connector_client = self._create_streaming_connector_client(
×
65
            activity, request_handler
66
        )
67
        context.turn_state[self.BOT_CONNECTOR_CLIENT_KEY] = connector_client
×
68

69
        await self.run_pipeline(context, bot_callback_handler)
×
70

71
        if activity.type == ActivityTypes.invoke:
×
72
            activity_invoke_response = context.turn_state.get(self._INVOKE_RESPONSE_KEY)
×
73

74
            if not activity_invoke_response:
×
75
                return InvokeResponse(status=HTTPStatus.NOT_IMPLEMENTED)
×
76
            return activity_invoke_response.value
×
77

78
        return None
×
79

80
    async def send_streaming_activity(self, activity: Activity) -> ResourceResponse:
4✔
81
        raise NotImplementedError()
×
82

83
    def can_process_outgoing_activity(self, activity: Activity) -> bool:
4✔
84
        if not activity:
×
85
            raise TypeError(
×
86
                f"'activity: {activity.__class__.__name__}' argument can't be None"
87
            )
88

89
        return not activity.service_url.startswith("https")
×
90

91
    async def process_outgoing_activity(
4✔
92
        self, _turn_context: TurnContext, activity: Activity
93
    ) -> ResourceResponse:
94
        if not activity:
×
95
            raise TypeError(
×
96
                f"'activity: {activity.__class__.__name__}' argument can't be None"
97
            )
98

99
        # TODO: Check if we have token responses from OAuth cards.
100

101
        # The ServiceUrl for streaming channels begins with the string "urn" and contains
102
        # information unique to streaming connections. Now that we know that this is a streaming
103
        # activity, process it in the streaming pipeline.
104
        # Process streaming activity.
105
        return await self.send_streaming_activity(activity)
×
106

107
    def _create_streaming_connector_client(
4✔
108
        self, activity: Activity, request_handler: StreamingRequestHandler
109
    ) -> ConnectorClient:
110
        empty_credentials = (
×
111
            MicrosoftAppCredentials.empty()
112
            if self._channel_provider and self._channel_provider.is_government()
113
            else MicrosoftGovernmentAppCredentials.empty()
114
        )
115
        streaming_driver = StreamingHttpDriver(request_handler)
×
116
        config = BotFrameworkConnectorConfiguration(
×
117
            empty_credentials,
118
            activity.service_url,
119
            pipeline_type=AsyncBfPipeline,
120
            driver=streaming_driver,
121
        )
122
        streaming_driver.config = config
×
123
        connector_client = ConnectorClient(None, custom_configuration=config)
×
124

125
        return connector_client
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc