• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.0
/freqtrade/rpc/api_server/ws/channel.py
1
import asyncio
1✔
2
import logging
1✔
3
import time
1✔
4
from collections import deque
1✔
5
from contextlib import asynccontextmanager
1✔
6
from typing import Any, AsyncIterator, Deque, Dict, List, Optional, Type, Union
1✔
7
from uuid import uuid4
1✔
8

9
from fastapi import WebSocketDisconnect
1✔
10
from websockets.exceptions import ConnectionClosed
1✔
11

12
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
1✔
13
from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer,
1✔
14
                                                    WebSocketSerializer)
15
from freqtrade.rpc.api_server.ws.types import WebSocketType
1✔
16
from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType
1✔
17

18

19
logger = logging.getLogger(__name__)
1✔
20

21

22
class WebSocketChannel:
1✔
23
    """
24
    Object to help facilitate managing a websocket connection
25
    """
26
    def __init__(
1✔
27
        self,
28
        websocket: WebSocketType,
29
        channel_id: Optional[str] = None,
30
        serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
31
        send_throttle: float = 0.01
32
    ):
33
        self.channel_id = channel_id if channel_id else uuid4().hex[:8]
1✔
34
        self._websocket = WebSocketProxy(websocket)
1✔
35

36
        # Internal event to signify a closed websocket
37
        self._closed = asyncio.Event()
1✔
38
        # The async tasks created for the channel
39
        self._channel_tasks: List[asyncio.Task] = []
1✔
40

41
        # Deque for average send times
42
        self._send_times: Deque[float] = deque([], maxlen=10)
1✔
43
        # High limit defaults to 3 to start
44
        self._send_high_limit = 3
1✔
45
        self._send_throttle = send_throttle
1✔
46

47
        # The subscribed message types
48
        self._subscriptions: List[str] = []
1✔
49

50
        # Wrap the WebSocket in the Serializing class
51
        self._wrapped_ws = serializer_cls(self._websocket)
1✔
52

53
    def __repr__(self):
1✔
54
        return f"WebSocketChannel({self.channel_id}, {self.remote_addr})"
1✔
55

56
    @property
1✔
57
    def raw_websocket(self):
1✔
58
        return self._websocket.raw_websocket
×
59

60
    @property
1✔
61
    def remote_addr(self):
1✔
62
        return self._websocket.remote_addr
1✔
63

64
    @property
1✔
65
    def avg_send_time(self):
1✔
66
        return sum(self._send_times) / len(self._send_times)
×
67

68
    def _calc_send_limit(self):
1✔
69
        """
70
        Calculate the send high limit for this channel
71
        """
72

73
        # Only update if we have enough data
74
        if len(self._send_times) == self._send_times.maxlen:
1✔
75
            # At least 1s or twice the average of send times, with a
76
            # maximum of 3 seconds per message
77
            self._send_high_limit = min(max(self.avg_send_time * 2, 1), 3)
×
78

79
    async def send(
1✔
80
        self,
81
        message: Union[WSMessageSchemaType, Dict[str, Any]],
82
        timeout: bool = False
83
    ):
84
        """
85
        Send a message on the wrapped websocket. If the sending
86
        takes too long, it will raise a TimeoutError and
87
        disconnect the connection.
88

89
        :param message: The message to send
90
        :param timeout: Enforce send high limit, defaults to False
91
        """
92
        try:
1✔
93
            _ = time.time()
1✔
94
            # If the send times out, it will raise
95
            # a TimeoutError and bubble up to the
96
            # message_endpoint to close the connection
97
            await asyncio.wait_for(
1✔
98
                self._wrapped_ws.send(message),
99
                timeout=self._send_high_limit if timeout else None
100
            )
101
            total_time = time.time() - _
1✔
102
            self._send_times.append(total_time)
1✔
103

104
            self._calc_send_limit()
1✔
105
        except asyncio.TimeoutError:
1✔
106
            logger.info(f"Connection for {self} timed out, disconnecting")
×
107
            raise
×
108

109
        # Explicitly give control back to event loop as
110
        # websockets.send does not
111
        # Also throttles how fast we send
112
        await asyncio.sleep(self._send_throttle)
1✔
113

114
    async def recv(self):
1✔
115
        """
116
        Receive a message on the wrapped websocket
117
        """
118
        return await self._wrapped_ws.recv()
1✔
119

120
    async def ping(self):
1✔
121
        """
122
        Ping the websocket
123
        """
124
        return await self._websocket.ping()
×
125

126
    async def accept(self):
1✔
127
        """
128
        Accept the underlying websocket connection,
129
        if the connection has been closed before we can
130
        accept, just close the channel.
131
        """
132
        try:
1✔
133
            return await self._websocket.accept()
1✔
134
        except RuntimeError:
×
135
            await self.close()
×
136

137
    async def close(self):
1✔
138
        """
139
        Close the WebSocketChannel
140
        """
141

142
        self._closed.set()
1✔
143

144
        try:
1✔
145
            await self._websocket.close()
1✔
146
        except RuntimeError:
×
147
            pass
×
148

149
    def is_closed(self) -> bool:
1✔
150
        """
151
        Closed flag
152
        """
153
        return self._closed.is_set()
1✔
154

155
    def set_subscriptions(self, subscriptions: List[str]) -> None:
1✔
156
        """
157
        Set which subscriptions this channel is subscribed to
158

159
        :param subscriptions: List of subscriptions, List[str]
160
        """
161
        self._subscriptions = subscriptions
×
162

163
    def subscribed_to(self, message_type: str) -> bool:
1✔
164
        """
165
        Check if this channel is subscribed to the message_type
166

167
        :param message_type: The message type to check
168
        """
169
        return message_type in self._subscriptions
×
170

171
    async def run_channel_tasks(self, *tasks, **kwargs):
1✔
172
        """
173
        Create and await on the channel tasks unless an exception
174
        was raised, then cancel them all.
175

176
        :params *tasks: All coros or tasks to be run concurrently
177
        :param **kwargs: Any extra kwargs to pass to gather
178
        """
179

180
        if not self.is_closed():
1✔
181
            # Wrap the coros into tasks if they aren't already
182
            self._channel_tasks = [
1✔
183
                task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
184
                for task in tasks
185
            ]
186

187
            try:
1✔
188
                return await asyncio.gather(*self._channel_tasks, **kwargs)
1✔
189
            except Exception:
1✔
190
                # If an exception occurred, cancel the rest of the tasks
191
                await self.cancel_channel_tasks()
1✔
192

193
    async def cancel_channel_tasks(self):
1✔
194
        """
195
        Cancel and wait on all channel tasks
196
        """
197
        for task in self._channel_tasks:
1✔
198
            task.cancel()
1✔
199

200
            # Wait for tasks to finish cancelling
201
            try:
1✔
202
                await task
1✔
203
            except (
1✔
204
                asyncio.CancelledError,
205
                asyncio.TimeoutError,
206
                WebSocketDisconnect,
207
                ConnectionClosed,
208
                RuntimeError
209
            ):
210
                pass
1✔
211
            except Exception as e:
×
212
                logger.info(f"Encountered unknown exception: {e}", exc_info=e)
×
213

214
        self._channel_tasks = []
1✔
215

216
    async def __aiter__(self):
1✔
217
        """
218
        Generator for received messages
219
        """
220
        # We can not catch any errors here as websocket.recv is
221
        # the first to catch any disconnects and bubble it up
222
        # so the connection is garbage collected right away
223
        while not self.is_closed():
1✔
224
            yield await self.recv()
1✔
225

226

227
@asynccontextmanager
1✔
228
async def create_channel(
1✔
229
    websocket: WebSocketType,
230
    **kwargs
231
) -> AsyncIterator[WebSocketChannel]:
232
    """
233
    Context manager for safely opening and closing a WebSocketChannel
234
    """
235
    channel = WebSocketChannel(websocket, **kwargs)
1✔
236
    try:
1✔
237
        await channel.accept()
1✔
238
        logger.info(f"Connected to channel - {channel}")
1✔
239

240
        yield channel
1✔
241
    finally:
242
        await channel.close()
1✔
243
        logger.info(f"Disconnected from channel - {channel}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc