• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sammchardy / python-binance / 12482124532

24 Dec 2024 01:08PM UTC coverage: 68.929% (-0.03%) from 68.96%
12482124532

Pull #1525

github

web-flow
Merge bb21af25a into d8e2ce169
Pull Request #1525: Util for generating endpoints added

2915 of 4229 relevant lines covered (68.93%)

4.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.11
/binance/ws/reconnecting_websocket.py
1
import asyncio
6✔
2
import gzip
6✔
3
import json
6✔
4
import logging
6✔
5
from socket import gaierror
6✔
6
from typing import Optional
6✔
7
from asyncio import sleep
6✔
8
from random import random
6✔
9

10
# load orjson if available, otherwise default to json
11
orjson = None
6✔
12
try:
6✔
13
    import orjson as orjson
6✔
14
except ImportError:
6✔
15
    pass
6✔
16

17
try:
6✔
18
    from websockets.exceptions import ConnectionClosedError  # type: ignore
6✔
19
except ImportError:
×
20
    from websockets import ConnectionClosedError  # type: ignore
×
21

22

23
Proxy = None
6✔
24
proxy_connect = None
6✔
25
try:
6✔
26
    from websockets_proxy import Proxy as w_Proxy, proxy_connect as w_proxy_connect
6✔
27

28
    Proxy = w_Proxy
5✔
29
    proxy_connect = w_proxy_connect
5✔
30
except ImportError:
1✔
31
    pass
1✔
32

33
import websockets as ws
6✔
34

35
from binance.exceptions import (
6✔
36
    BinanceWebsocketClosed,
37
    BinanceWebsocketUnableToConnect,
38
    BinanceWebsocketQueueOverflow,
39
)
40
from binance.helpers import get_loop
6✔
41
from binance.ws.constants import WSListenerState
6✔
42

43

44
class ReconnectingWebsocket:
6✔
45
    MAX_RECONNECTS = 5
6✔
46
    MAX_RECONNECT_SECONDS = 60
6✔
47
    MIN_RECONNECT_WAIT = 0.1
6✔
48
    TIMEOUT = 10
6✔
49
    NO_MESSAGE_RECONNECT_TIMEOUT = 60
6✔
50
    MAX_QUEUE_SIZE = 100
6✔
51

52
    def __init__(
6✔
53
        self,
54
        url: str,
55
        path: Optional[str] = None,
56
        prefix: str = "ws/",
57
        is_binary: bool = False,
58
        exit_coro=None,
59
        https_proxy: Optional[str] = None,
60
        **kwargs,
61
    ):
62
        self._loop = get_loop()
6✔
63
        self._log = logging.getLogger(__name__)
6✔
64
        self._path = path
6✔
65
        self._url = url
6✔
66
        self._exit_coro = exit_coro
6✔
67
        self._prefix = prefix
6✔
68
        self._reconnects = 0
6✔
69
        self._is_binary = is_binary
6✔
70
        self._conn = None
6✔
71
        self._socket = None
6✔
72
        self.ws: Optional[ws.WebSocketClientProtocol] = None  # type: ignore
6✔
73
        self.ws_state = WSListenerState.INITIALISING
6✔
74
        self._queue = asyncio.Queue()
6✔
75
        self._handle_read_loop = None
6✔
76
        self._https_proxy = https_proxy
6✔
77
        self._ws_kwargs = kwargs
6✔
78

79
    def json_dumps(self, msg):
6✔
80
        if orjson:
6✔
81
            return orjson.dumps(msg)
×
82
        return json.dumps(msg)
6✔
83

84
    def json_loads(self, msg):
6✔
85
        if orjson:
6✔
86
            return orjson.loads(msg)
×
87
        return json.loads(msg)
6✔
88

89
    async def __aenter__(self):
6✔
90
        await self.connect()
6✔
91
        return self
6✔
92

93
    async def close(self):
6✔
94
        await self.__aexit__(None, None, None)
6✔
95

96
    async def __aexit__(self, exc_type, exc_val, exc_tb):
6✔
97
        self._log.debug(f"Closing Websocket {self._url}{self._prefix}{self._path}")
6✔
98
        if self._exit_coro:
6✔
99
            await self._exit_coro(self._path)
6✔
100
        self.ws_state = WSListenerState.EXITING
6✔
101
        if self.ws:
6✔
102
            await self.ws.close()
6✔
103
        if self._conn and hasattr(self._conn, "protocol"):
6✔
104
            await self._conn.__aexit__(exc_type, exc_val, exc_tb)
6✔
105
        self.ws = None
6✔
106
        if self._handle_read_loop:
6✔
107
            await self._kill_read_loop()
6✔
108

109
    async def connect(self):
6✔
110
        self._log.debug("Establishing new WebSocket connection")
6✔
111
        self.ws_state = WSListenerState.RECONNECTING
6✔
112
        await self._before_connect()
6✔
113

114
        ws_url = (
6✔
115
            f"{self._url}{getattr(self, '_prefix', '')}{getattr(self, '_path', '')}"
116
        )
117

118
        # handle https_proxy
119
        if self._https_proxy:
6✔
120
            if not Proxy or not proxy_connect:
5✔
121
                raise ImportError(
×
122
                    "websockets_proxy is not installed, please install it to use a websockets proxy (pip install websockets_proxy)"
123
                )
124
            proxy = Proxy.from_url(self._https_proxy)  # type: ignore
5✔
125
            self._conn = proxy_connect(
5✔
126
                ws_url, close_timeout=0.1, proxy=proxy, **self._ws_kwargs
127
            )  # type: ignore
128
        else:
129
            self._conn = ws.connect(ws_url, close_timeout=0.1, **self._ws_kwargs)  # type: ignore
6✔
130

131
        try:
6✔
132
            self.ws = await self._conn.__aenter__()
6✔
133
        except Exception as e:  # noqa
6✔
134
            self._log.error(f"Failed to connect to websocket: {e}")
6✔
135
            self.ws_state = WSListenerState.RECONNECTING
6✔
136
            raise e
6✔
137
        self.ws_state = WSListenerState.STREAMING
6✔
138
        self._reconnects = 0
6✔
139
        await self._after_connect()
6✔
140
        if not self._handle_read_loop:
6✔
141
            self._handle_read_loop = self._loop.call_soon_threadsafe(
6✔
142
                asyncio.create_task, self._read_loop()
143
            )
144

145
    async def _kill_read_loop(self):
6✔
146
        self.ws_state = WSListenerState.EXITING
6✔
147
        while self._handle_read_loop:
6✔
148
            await sleep(0.1)
6✔
149
        self._log.debug("Finished killing read_loop")
6✔
150

151
    async def _before_connect(self):
6✔
152
        pass
6✔
153

154
    async def _after_connect(self):
6✔
155
        pass
6✔
156

157
    def _handle_message(self, evt):
6✔
158
        if self._is_binary:
6✔
159
            try:
6✔
160
                evt = gzip.decompress(evt)
6✔
161
            except (ValueError, OSError) as e:
×
162
                self._log.error(f"Failed to decompress message: {(e)}")
×
163
                raise
×
164
            except Exception as e:
×
165
                self._log.error(f"Unexpected decompression error: {(e)}")
×
166
                raise
×
167
        try:
6✔
168
            return self.json_loads(evt)
6✔
169
        except ValueError as e:
6✔
170
            self._log.error(f"JSON Value Error parsing message: Error: {(e)}")
6✔
171
            raise
6✔
172
        except TypeError as e:
×
173
            self._log.error(f"JSON Type Error parsing message. Error: {(e)}")
×
174
            raise
×
175
        except Exception as e:
×
176
            self._log.error(f"Unexpected error parsing message. Error: {(e)}")
×
177
            raise
×
178

179
    async def _read_loop(self):
6✔
180
        try:
6✔
181
            while True:
4✔
182
                try:
6✔
183
                    while self.ws_state == WSListenerState.RECONNECTING:
6✔
184
                        await self._run_reconnect()
5✔
185

186
                    if self.ws_state == WSListenerState.EXITING:
6✔
187
                        self._log.debug(
6✔
188
                            f"_read_loop {self._path} break for {self.ws_state}"
189
                        )
190
                        break
6✔
191
                    elif self.ws.state == ws.protocol.State.CLOSING:  # type: ignore
6✔
192
                        await asyncio.sleep(0.1)
×
193
                        continue
×
194
                    elif self.ws.state == ws.protocol.State.CLOSED:  # type: ignore
6✔
195
                        self._reconnect()
5✔
196
                        raise BinanceWebsocketClosed(
5✔
197
                            "Connection closed. Reconnecting..."
198
                        )
199
                    elif self.ws_state == WSListenerState.STREAMING:
6✔
200
                        assert self.ws
6✔
201
                        res = await asyncio.wait_for(
6✔
202
                            self.ws.recv(), timeout=self.TIMEOUT
203
                        )
204
                        res = self._handle_message(res)
6✔
205
                        self._log.debug(f"Received message: {res}")
6✔
206
                        if res:
6✔
207
                            if self._queue.qsize() < self.MAX_QUEUE_SIZE:
6✔
208
                                await self._queue.put(res)
6✔
209
                            else:
210
                                raise BinanceWebsocketQueueOverflow(
1✔
211
                                    f"Message queue size {self._queue.qsize()} exceeded maximum {self.MAX_QUEUE_SIZE}"
212
                                )
213
                except asyncio.TimeoutError:
6✔
214
                    self._log.debug(f"no message in {self.TIMEOUT} seconds")
×
215
                    # _no_message_received_reconnect
216
                except asyncio.CancelledError as e:
6✔
217
                    self._log.debug(f"_read_loop cancelled error {e}")
6✔
218
                    break
6✔
219
                except (
6✔
220
                    asyncio.IncompleteReadError,
221
                    gaierror,
222
                    ConnectionClosedError,
223
                    BinanceWebsocketClosed,
224
                ) as e:
225
                    # reports errors and continue loop
226
                    self._log.error(f"{e.__class__.__name__} ({e})")
6✔
227
                    await self._queue.put({
6✔
228
                        "e": "error",
229
                        "type": f"{e.__class__.__name__}",
230
                        "m": f"{e}",
231
                    })
232
                except (
5✔
233
                    BinanceWebsocketUnableToConnect,
234
                    BinanceWebsocketQueueOverflow,
235
                    Exception,
236
                ) as e:
237
                    # reports errors and break the loop
238
                    self._log.error(f"Unknown exception ({e})")
5✔
239
                    await self._queue.put({
5✔
240
                        "e": "error",
241
                        "type": e.__class__.__name__,
242
                        "m": f"{e}",
243
                    })
244
                    break
5✔
245
        finally:
246
            self._handle_read_loop = None  # Signal the coro is stopped
6✔
247
            self._reconnects = 0
6✔
248

249
    async def _run_reconnect(self):
6✔
250
        await self.before_reconnect()
5✔
251
        if self._reconnects < self.MAX_RECONNECTS:
5✔
252
            reconnect_wait = self._get_reconnect_wait(self._reconnects)
5✔
253
            self._log.debug(
5✔
254
                f"websocket reconnecting. {self.MAX_RECONNECTS - self._reconnects} reconnects left - "
255
                f"waiting {reconnect_wait}"
256
            )
257
            await asyncio.sleep(reconnect_wait)
5✔
258
            try:
5✔
259
                await self.connect()
5✔
260
            except Exception as e:
5✔
261
                pass
5✔
262
        else:
263
            self._log.error(f"Max reconnections {self.MAX_RECONNECTS} reached:")
5✔
264
            # Signal the error
265
            raise BinanceWebsocketUnableToConnect
5✔
266

267
    async def recv(self):
6✔
268
        res = None
6✔
269
        while not res:
6✔
270
            try:
6✔
271
                res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT)
6✔
272
            except asyncio.TimeoutError:
5✔
273
                self._log.debug(f"no message in {self.TIMEOUT} seconds")
5✔
274
        return res
6✔
275

276
    async def _wait_for_reconnect(self):
6✔
277
        while (
×
278
            self.ws_state != WSListenerState.STREAMING
279
            and self.ws_state != WSListenerState.EXITING
280
        ):
281
            await sleep(0.1)
×
282

283
    def _get_reconnect_wait(self, attempts: int) -> int:
6✔
284
        expo = 2**attempts
6✔
285
        return round(random() * min(self.MAX_RECONNECT_SECONDS, expo - 1) + 1)
6✔
286

287
    async def before_reconnect(self):
6✔
288
        if self.ws:
5✔
289
            self.ws = None
5✔
290

291
        if self._conn and hasattr(self._conn, "protocol"):
5✔
292
            await self._conn.__aexit__(None, None, None)
5✔
293

294
        self._reconnects += 1
5✔
295

296
    def _reconnect(self):
6✔
297
        self.ws_state = WSListenerState.RECONNECTING
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc