• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sammchardy / python-binance / 11577620865

29 Oct 2024 03:51PM UTC coverage: 57.804% (-0.01%) from 57.817%
11577620865

push

github

web-flow
chore: ruff linting (#1460)

* add pyproject

* ruff fix

* fix spacing

* fix tests

* more fixes

* update python-app

* reformat

* disable reformat for now

* add any

* add decorator

* add test

396 of 1299 new or added lines in 6 files covered. (30.48%)

47 existing lines in 5 files now uncovered.

1985 of 3434 relevant lines covered (57.8%)

2.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.37
/binance/depthcache.py
1
import logging
4✔
2
from operator import itemgetter
4✔
3
import asyncio
4✔
4
import time
4✔
5
from typing import Optional, Dict, Callable
4✔
6

7
from .helpers import get_loop
4✔
8
from .streams import BinanceSocketManager
4✔
9
from .threaded_stream import ThreadedApiManager
4✔
10

11

12
class DepthCache(object):
4✔
13
    def __init__(self, symbol, conv_type: Callable = float):
4✔
14
        """Initialise the DepthCache
15

16
        :param symbol: Symbol to create depth cache for
17
        :type symbol: string
18
        :param conv_type: Optional type to represent price, and amount, default is float.
19
        :type conv_type: function.
20

21
        """
22
        self.symbol = symbol
4✔
23
        self._bids = {}
4✔
24
        self._asks = {}
4✔
25
        self.update_time = None
4✔
26
        self.conv_type: Callable = conv_type
4✔
27
        self._log = logging.getLogger(__name__)
4✔
28

29
    def add_bid(self, bid):
4✔
30
        """Add a bid to the cache
31

32
        :param bid:
33
        :return:
34

35
        """
36
        self._bids[bid[0]] = self.conv_type(bid[1])
4✔
37
        if bid[1] == "0.00000000":
4✔
38
            del self._bids[bid[0]]
×
39

40
    def add_ask(self, ask):
4✔
41
        """Add an ask to the cache
42

43
        :param ask:
44
        :return:
45

46
        """
47
        self._asks[ask[0]] = self.conv_type(ask[1])
4✔
48
        if ask[1] == "0.00000000":
4✔
49
            del self._asks[ask[0]]
×
50

51
    def get_bids(self):
4✔
52
        """Get the current bids
53

54
        :return: list of bids with price and quantity as conv_type
55

56
        .. code-block:: python
57

58
            [
59
                [
60
                    0.0001946,  # Price
61
                    45.0        # Quantity
62
                ],
63
                [
64
                    0.00019459,
65
                    2384.0
66
                ],
67
                [
68
                    0.00019158,
69
                    5219.0
70
                ],
71
                [
72
                    0.00019157,
73
                    1180.0
74
                ],
75
                [
76
                    0.00019082,
77
                    287.0
78
                ]
79
            ]
80

81
        """
82
        return DepthCache.sort_depth(self._bids, reverse=True, conv_type=self.conv_type)
4✔
83

84
    def get_asks(self):
4✔
85
        """Get the current asks
86

87
        :return: list of asks with price and quantity as conv_type.
88

89
        .. code-block:: python
90

91
            [
92
                [
93
                    0.0001955,  # Price
94
                    57.0'       # Quantity
95
                ],
96
                [
97
                    0.00019699,
98
                    778.0
99
                ],
100
                [
101
                    0.000197,
102
                    64.0
103
                ],
104
                [
105
                    0.00019709,
106
                    1130.0
107
                ],
108
                [
109
                    0.0001971,
110
                    385.0
111
                ]
112
            ]
113

114
        """
115
        return DepthCache.sort_depth(
4✔
116
            self._asks, reverse=False, conv_type=self.conv_type
117
        )
118

119
    @staticmethod
4✔
120
    def sort_depth(vals, reverse=False, conv_type: Callable = float):
4✔
121
        """Sort bids or asks by price"""
122
        if isinstance(vals, dict):
4✔
123
            lst = [
4✔
124
                [conv_type(price), conv_type(quantity)]
125
                for price, quantity in vals.items()
126
            ]
127
        elif isinstance(vals, list):
×
128
            lst = [[conv_type(price), conv_type(quantity)] for price, quantity in vals]
×
129
        else:
NEW
130
            raise ValueError(f"Unknown order book depth data type: {type(vals)}")
×
131
        lst = sorted(lst, key=itemgetter(0), reverse=reverse)
4✔
132
        return lst
4✔
133

134

135
DEFAULT_REFRESH = 60 * 30  # 30 minutes
4✔
136

137

138
class BaseDepthCacheManager:
4✔
139
    TIMEOUT = 60
4✔
140

141
    def __init__(
4✔
142
        self,
143
        client,
144
        symbol,
145
        loop=None,
146
        refresh_interval: Optional[int] = DEFAULT_REFRESH,
147
        bm=None,
148
        limit=10,
149
        conv_type=float,
150
    ):
151
        """Create a DepthCacheManager instance
152

153
        :param client: Binance API client
154
        :type client: binance.Client
155
        :param loop:
156
        :type loop:
157
        :param symbol: Symbol to create depth cache for
158
        :type symbol: string
159
        :param refresh_interval: Optional number of seconds between cache refresh, use 0 or None to disable
160
        :type refresh_interval: int
161
        :param bm: Optional BinanceSocketManager
162
        :type bm: BinanceSocketManager
163
        :param limit: Optional number of orders to get from orderbook
164
        :type limit: int
165
        :param conv_type: Optional type to represent price, and amount, default is float.
166
        :type conv_type: function.
167

168
        """
169

170
        self._client = client
×
171
        self._depth_cache = None
×
172
        self._loop = loop or get_loop()
×
173
        self._symbol = symbol
×
174
        self._limit = limit
×
175
        self._last_update_id = None
×
176
        self._bm = bm or BinanceSocketManager(self._client)
×
177
        self._refresh_interval = refresh_interval
×
178
        self._conn_key = None
×
179
        self._conv_type = conv_type
×
180
        self._log = logging.getLogger(__name__)
×
181

182
    async def __aenter__(self):
4✔
NEW
183
        await asyncio.gather(self._init_cache(), self._start_socket())
×
UNCOV
184
        await self._socket.__aenter__()
×
185
        return self
×
186

187
    async def __aexit__(self, *args, **kwargs):
4✔
188
        await self._socket.__aexit__(*args, **kwargs)
×
189

190
    async def recv(self):
4✔
191
        dc = None
×
192
        while not dc:
×
193
            try:
×
194
                res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT)
×
195
            except Exception as e:
×
196
                self._log.warning(e)
×
197
            else:
198
                dc = await self._depth_event(res)
×
199
        return dc
×
200

201
    async def _init_cache(self):
4✔
202
        """Initialise the depth cache calling REST endpoint
203

204
        :return:
205
        """
206

207
        # initialise or clear depth cache
208
        self._depth_cache = DepthCache(self._symbol, conv_type=self._conv_type)
×
209

210
        # set a time to refresh the depth cache
211
        if self._refresh_interval:
×
212
            self._refresh_time = int(time.time()) + self._refresh_interval
×
213

214
    async def _start_socket(self):
4✔
215
        """Start the depth cache socket
216

217
        :return:
218
        """
219
        self._socket = self._get_socket()
×
220

221
    def _get_socket(self):
4✔
222
        raise NotImplementedError
×
223

224
    async def _depth_event(self, msg):
4✔
225
        """Handle a depth event
226

227
        :param msg:
228
        :return:
229

230
        """
231

232
        if not msg:
×
233
            return None
×
234

NEW
235
        if "e" in msg and msg["e"] == "error":
×
236
            # close the socket
237
            await self.close()
×
238

239
            # notify the user by returning a None value
240
            return None
×
241

242
        return await self._process_depth_message(msg)
×
243

244
    async def _process_depth_message(self, msg):
4✔
245
        """Process a depth event message.
246

247
        :param msg: Depth event message.
248
        :return:
249

250
        """
251

252
        # add any bid or ask values
253
        self._apply_orders(msg)
×
254

255
        # call the callback with the updated depth cache
256
        res = self._depth_cache
×
257

258
        # after processing event see if we need to refresh the depth cache
259
        if self._refresh_interval and int(time.time()) > self._refresh_time:
×
260
            await self._init_cache()
×
261

262
        return res
×
263

264
    def _apply_orders(self, msg):
4✔
265
        assert self._depth_cache
×
NEW
266
        for bid in msg.get("b", []) + msg.get("bids", []):
×
267
            self._depth_cache.add_bid(bid)
×
NEW
268
        for ask in msg.get("a", []) + msg.get("asks", []):
×
269
            self._depth_cache.add_ask(ask)
×
270

271
        # keeping update time
NEW
272
        self._depth_cache.update_time = msg.get("E") or msg.get("lastUpdateId")
×
273

274
    def get_depth_cache(self):
4✔
275
        """Get the current depth cache
276

277
        :return: DepthCache object
278

279
        """
280
        return self._depth_cache
×
281

282
    async def close(self):
4✔
283
        """Close the open socket for this manager
284

285
        :return:
286
        """
287
        self._depth_cache = None
×
288

289
    def get_symbol(self):
4✔
290
        """Get the symbol
291

292
        :return: symbol
293
        """
294
        return self._symbol
×
295

296

297
class DepthCacheManager(BaseDepthCacheManager):
4✔
298
    def __init__(
4✔
299
        self,
300
        client,
301
        symbol,
302
        loop=None,
303
        refresh_interval: Optional[int] = None,
304
        bm=None,
305
        limit=500,
306
        conv_type=float,
307
        ws_interval=None,
308
    ):
309
        """Initialise the DepthCacheManager
310

311
        :param client: Binance API client
312
        :type client: binance.Client
313
        :param loop: asyncio loop
314
        :param symbol: Symbol to create depth cache for
315
        :type symbol: string
316
        :param refresh_interval: Optional number of seconds between cache refresh, use 0 or None to disable
317
        :type refresh_interval: int
318
        :param limit: Optional number of orders to get from orderbook
319
        :type limit: int
320
        :param conv_type: Optional type to represent price, and amount, default is float.
321
        :type conv_type: function.
322
        :param ws_interval: Optional interval for updates on websocket, default None. If not set, updates happen every second. Must be 0, None (1s) or 100 (100ms).
323
        :type ws_interval: int
324

325
        """
326
        super().__init__(client, symbol, loop, refresh_interval, bm, limit, conv_type)
×
327
        self._ws_interval = ws_interval
×
328

329
    async def _init_cache(self):
4✔
330
        """Initialise the depth cache calling REST endpoint
331

332
        :return:
333
        """
334
        self._last_update_id = None
×
335
        self._depth_message_buffer = []
×
336

337
        res = await self._client.get_order_book(symbol=self._symbol, limit=self._limit)
×
338

339
        # initialise or clear depth cache
340
        await super()._init_cache()
×
341

342
        # process bid and asks from the order book
343
        self._apply_orders(res)
×
344
        assert self._depth_cache
×
NEW
345
        for bid in res["bids"]:
×
346
            self._depth_cache.add_bid(bid)
×
NEW
347
        for ask in res["asks"]:
×
348
            self._depth_cache.add_ask(ask)
×
349

350
        # set first update id
NEW
351
        self._last_update_id = res["lastUpdateId"]
×
352

353
        # Apply any updates from the websocket
354
        for msg in self._depth_message_buffer:
×
355
            await self._process_depth_message(msg)
×
356

357
        # clear the depth buffer
358
        self._depth_message_buffer = []
×
359

360
    async def _start_socket(self):
4✔
361
        """Start the depth cache socket
362

363
        :return:
364
        """
NEW
365
        if not getattr(self, "_depth_message_buffer", None):
×
366
            self._depth_message_buffer = []
×
367

368
        await super()._start_socket()
×
369

370
    def _get_socket(self):
4✔
371
        return self._bm.depth_socket(self._symbol, interval=self._ws_interval)
×
372

373
    async def _process_depth_message(self, msg):
4✔
374
        """Process a depth event message.
375

376
        :param msg: Depth event message.
377
        :return:
378

379
        """
380

381
        if self._last_update_id is None:
×
382
            # Initial depth snapshot fetch not yet performed, buffer messages
383
            self._depth_message_buffer.append(msg)
×
384
            return
×
385

NEW
386
        if msg["u"] <= self._last_update_id:
×
387
            # ignore any updates before the initial update id
388
            return
×
NEW
389
        elif msg["U"] != self._last_update_id + 1:
×
390
            # if not buffered check we get sequential updates
391
            # otherwise init cache again
392
            await self._init_cache()
×
393

394
        # add any bid or ask values
395
        self._apply_orders(msg)
×
396

397
        # call the callback with the updated depth cache
398
        res = self._depth_cache
×
399

NEW
400
        self._last_update_id = msg["u"]
×
401

402
        # after processing event see if we need to refresh the depth cache
403
        if self._refresh_interval and int(time.time()) > self._refresh_time:
×
404
            await self._init_cache()
×
405

406
        return res
×
407

408

409
class FuturesDepthCacheManager(BaseDepthCacheManager):
4✔
410
    async def _process_depth_message(self, msg):
4✔
411
        """Process a depth event message.
412

413
        :param msg: Depth event message.
414
        :return:
415

416
        """
NEW
417
        msg = msg.get("data")
×
418
        return await super()._process_depth_message(msg)
×
419

420
    def _apply_orders(self, msg):
4✔
421
        assert self._depth_cache
×
NEW
422
        self._depth_cache._bids = msg.get("b", [])
×
NEW
423
        self._depth_cache._asks = msg.get("a", [])
×
424

425
        # keeping update time
NEW
426
        self._depth_cache.update_time = msg.get("E") or msg.get("lastUpdateId")
×
427

428
    def _get_socket(self):
4✔
429
        sock = self._bm.futures_depth_socket(self._symbol)
×
430
        return sock
×
431

432

433
class OptionsDepthCacheManager(BaseDepthCacheManager):
4✔
434
    def _get_socket(self):
4✔
UNCOV
435
        return self._bm.options_depth_socket(self._symbol)
×
436

437

438
class ThreadedDepthCacheManager(ThreadedApiManager):
4✔
439
    def __init__(
4✔
440
        self,
441
        api_key: Optional[str] = None,
442
        api_secret: Optional[str] = None,
443
        requests_params: Optional[Dict[str, str]] = None,
444
        tld: str = "com",
445
        testnet: bool = False,
446
    ):
447
        super().__init__(api_key, api_secret, requests_params, tld, testnet)
×
448

449
    def _start_depth_cache(
4✔
450
        self,
451
        dcm_class,
452
        callback: Callable,
453
        symbol: str,
454
        refresh_interval=None,
455
        bm=None,
456
        limit=10,
457
        conv_type=float,
458
        **kwargs,
459
    ) -> str:
UNCOV
460
        while not self._client:
×
461
            time.sleep(0.01)
×
462

463
        dcm = dcm_class(
×
464
            client=self._client,
465
            symbol=symbol,
466
            loop=self._loop,
467
            refresh_interval=refresh_interval,
468
            bm=bm,
469
            limit=limit,
470
            conv_type=conv_type,
471
            **kwargs,
472
        )
NEW
473
        path = symbol.lower() + "@depth" + str(limit)
×
474
        self._socket_running[path] = True
×
NEW
475
        self._loop.call_soon(
×
476
            asyncio.create_task, self.start_listener(dcm, path, callback)
477
        )
UNCOV
478
        return path
×
479

480
    def start_depth_cache(
4✔
481
        self,
482
        callback: Callable,
483
        symbol: str,
484
        refresh_interval=None,
485
        bm=None,
486
        limit=10,
487
        conv_type=float,
488
        ws_interval=0,
489
    ) -> str:
490
        return self._start_depth_cache(
×
491
            dcm_class=DepthCacheManager,
492
            callback=callback,
493
            symbol=symbol,
494
            refresh_interval=refresh_interval,
495
            bm=bm,
496
            limit=limit,
497
            conv_type=conv_type,
498
            ws_interval=ws_interval,
499
        )
500

501
    def start_futures_depth_socket(
4✔
502
        self,
503
        callback: Callable,
504
        symbol: str,
505
        refresh_interval=None,
506
        bm=None,
507
        limit=10,
508
        conv_type=float,
509
    ) -> str:
510
        return self._start_depth_cache(
×
511
            dcm_class=FuturesDepthCacheManager,
512
            callback=callback,
513
            symbol=symbol,
514
            refresh_interval=refresh_interval,
515
            bm=bm,
516
            limit=limit,
517
            conv_type=conv_type,
518
        )
519

520
    def start_options_depth_socket(
4✔
521
        self,
522
        callback: Callable,
523
        symbol: str,
524
        refresh_interval=None,
525
        bm=None,
526
        limit=10,
527
        conv_type=float,
528
    ) -> str:
529
        return self._start_depth_cache(
×
530
            dcm_class=OptionsDepthCacheManager,
531
            callback=callback,
532
            symbol=symbol,
533
            refresh_interval=refresh_interval,
534
            bm=bm,
535
            limit=limit,
536
            conv_type=conv_type,
537
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc