• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.36
/freqtrade/rpc/api_server/api_ws.py
1
import logging
1✔
2
import time
1✔
3
from typing import Any, Dict
1✔
4

5
from fastapi import APIRouter, Depends
1✔
6
from fastapi.websockets import WebSocket
1✔
7
from pydantic import ValidationError
1✔
8

9
from freqtrade.enums import RPCMessageType, RPCRequestType
1✔
10
from freqtrade.exceptions import FreqtradeException
1✔
11
from freqtrade.rpc.api_server.api_auth import validate_ws_token
1✔
12
from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc
1✔
13
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
1✔
14
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
1✔
15
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSErrorMessage,
1✔
16
                                                 WSMessageSchema, WSRequestSchema,
17
                                                 WSWhitelistMessage)
18
from freqtrade.rpc.rpc import RPC
1✔
19

20

21
logger = logging.getLogger(__name__)
1✔
22

23
# Private router, protected by API Key authentication
24
router = APIRouter()
1✔
25

26

27
async def channel_reader(channel: WebSocketChannel, rpc: RPC):
1✔
28
    """
29
    Iterate over the messages from the channel and process the request
30
    """
31
    async for message in channel:
1✔
32
        try:
1✔
33
            await _process_consumer_request(message, channel, rpc)
1✔
34
        except FreqtradeException:
1✔
35
            logger.exception(f"Error processing request from {channel}")
×
36
            response = WSErrorMessage(data='Error processing request')
×
37

38
            await channel.send(response.dict(exclude_none=True))
×
39

40

41
async def channel_broadcaster(channel: WebSocketChannel, message_stream: MessageStream):
1✔
42
    """
43
    Iterate over messages in the message stream and send them
44
    """
45
    async for message, ts in message_stream:
1✔
46
        if channel.subscribed_to(message.get('type')):
×
47
            # Log a warning if this channel is behind
48
            # on the message stream by a lot
49
            if (time.time() - ts) > 60:
×
50
                logger.warning(f"Channel {channel} is behind MessageStream by 1 minute,"
×
51
                               " this can cause a memory leak if you see this message"
52
                               " often, consider reducing pair list size or amount of"
53
                               " consumers.")
54

55
            await channel.send(message, timeout=True)
×
56

57

58
async def _process_consumer_request(
1✔
59
    request: Dict[str, Any],
60
    channel: WebSocketChannel,
61
    rpc: RPC
62
):
63
    """
64
    Validate and handle a request from a websocket consumer
65
    """
66
    # Validate the request, makes sure it matches the schema
67
    try:
1✔
68
        websocket_request = WSRequestSchema.model_validate(request)
1✔
69
    except ValidationError as e:
×
70
        logger.error(f"Invalid request from {channel}: {e}")
×
71
        return
×
72

73
    type_, data = websocket_request.type, websocket_request.data
1✔
74
    response: WSMessageSchema
75

76
    logger.debug(f"Request of type {type_} from {channel}")
1✔
77

78
    # If we have a request of type SUBSCRIBE, set the topics in this channel
79
    if type_ == RPCRequestType.SUBSCRIBE:
1✔
80
        # If the request is empty, do nothing
81
        if not data:
1✔
82
            return
×
83

84
        # If all topics passed are a valid RPCMessageType, set subscriptions on channel
85
        if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
1✔
86
            channel.set_subscriptions(data)
1✔
87

88
        # We don't send a response for subscriptions
89
        return
1✔
90

91
    elif type_ == RPCRequestType.WHITELIST:
1✔
92
        # Get whitelist
93
        whitelist = rpc._ws_request_whitelist()
1✔
94

95
        # Format response
96
        response = WSWhitelistMessage(data=whitelist)
1✔
97
        await channel.send(response.model_dump(exclude_none=True))
1✔
98

99
    elif type_ == RPCRequestType.ANALYZED_DF:
1✔
100
        # Limit the amount of candles per dataframe to 'limit' or 1500
101
        limit = int(min(data.get('limit', 1500), 1500)) if data else None
1✔
102
        pair = data.get('pair', None) if data else None
1✔
103

104
        # For every pair in the generator, send a separate message
105
        for message in rpc._ws_request_analyzed_df(limit, pair):
1✔
106
            # Format response
107
            response = WSAnalyzedDFMessage(data=message)
1✔
108
            await channel.send(response.model_dump(exclude_none=True))
1✔
109

110

111
@router.websocket("/message/ws")
1✔
112
async def message_endpoint(
1✔
113
    websocket: WebSocket,
114
    token: str = Depends(validate_ws_token),
115
    rpc: RPC = Depends(get_rpc),
116
    message_stream: MessageStream = Depends(get_message_stream)
117
):
118
    if token:
1✔
119
        async with create_channel(websocket) as channel:
1✔
120
            await channel.run_channel_tasks(
1✔
121
                channel_reader(channel, rpc),
122
                channel_broadcaster(channel, message_stream)
123
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc