• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.45
/freqtrade/rpc/api_server/deps.py
1
from typing import Any, AsyncIterator, Dict, Optional
1✔
2
from uuid import uuid4
1✔
3

4
from fastapi import Depends, HTTPException
1✔
5

6
from freqtrade.constants import Config
1✔
7
from freqtrade.enums import RunMode
1✔
8
from freqtrade.persistence import Trade
1✔
9
from freqtrade.persistence.models import _request_id_ctx_var
1✔
10
from freqtrade.rpc.api_server.webserver_bgwork import ApiBG
1✔
11
from freqtrade.rpc.rpc import RPC, RPCException
1✔
12

13
from .webserver import ApiServer
1✔
14

15

16
def get_rpc_optional() -> Optional[RPC]:
1✔
17
    if ApiServer._has_rpc:
1✔
18
        return ApiServer._rpc
1✔
19
    return None
×
20

21

22
async def get_rpc() -> Optional[AsyncIterator[RPC]]:
1✔
23

24
    _rpc = get_rpc_optional()
1✔
25
    if _rpc:
1✔
26
        request_id = str(uuid4())
1✔
27
        ctx_token = _request_id_ctx_var.set(request_id)
1✔
28
        Trade.rollback()
1✔
29
        try:
1✔
30
            yield _rpc
1✔
31
        finally:
32
            Trade.session.remove()
1✔
33
            _request_id_ctx_var.reset(ctx_token)
1✔
34

35
    else:
36
        raise RPCException('Bot is not in the correct state')
×
37

38

39
def get_config() -> Dict[str, Any]:
1✔
40
    return ApiServer._config
1✔
41

42

43
def get_api_config() -> Dict[str, Any]:
1✔
44
    return ApiServer._config['api_server']
1✔
45

46

47
def _generate_exchange_key(config: Config) -> str:
1✔
48
    """
49
    Exchange key - used for caching the exchange object.
50
    """
51
    return f"{config['exchange']['name']}_{config.get('trading_mode', 'spot')}"
1✔
52

53

54
def get_exchange(config=Depends(get_config)):
1✔
55
    exchange_key = _generate_exchange_key(config)
1✔
56
    if not (exchange := ApiBG.exchanges.get(exchange_key)):
1✔
57
        from freqtrade.resolvers import ExchangeResolver
1✔
58
        exchange = ExchangeResolver.load_exchange(
1✔
59
            config, validate=False, load_leverage_tiers=False)
60
        ApiBG.exchanges[exchange_key] = exchange
1✔
61
    return exchange
1✔
62

63

64
def get_message_stream():
1✔
65
    return ApiServer._message_stream
1✔
66

67

68
def is_webserver_mode(config=Depends(get_config)):
1✔
69
    if config['runmode'] != RunMode.WEBSERVER:
1✔
70
        raise HTTPException(status_code=503,
1✔
71
                            detail='Bot is not in the correct state.')
72
    return None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc