• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.2
/freqtrade/edge/edge_positioning.py
1
# pragma pylint: disable=W0603
2
""" Edge positioning package """
1✔
3
import logging
1✔
4
from collections import defaultdict
1✔
5
from copy import deepcopy
1✔
6
from datetime import timedelta
1✔
7
from typing import Any, Dict, List, NamedTuple
1✔
8

9
import numpy as np
1✔
10
import utils_find_1st as utf1st
1✔
11
from pandas import DataFrame
1✔
12

13
from freqtrade.configuration import TimeRange
1✔
14
from freqtrade.constants import DATETIME_PRINT_FORMAT, UNLIMITED_STAKE_AMOUNT, Config
1✔
15
from freqtrade.data.history import get_timerange, load_data, refresh_data
1✔
16
from freqtrade.enums import CandleType, ExitType, RunMode
1✔
17
from freqtrade.exceptions import OperationalException
1✔
18
from freqtrade.exchange import timeframe_to_seconds
1✔
19
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
1✔
20
from freqtrade.strategy.interface import IStrategy
1✔
21
from freqtrade.util import dt_now
1✔
22

23

24
logger = logging.getLogger(__name__)
1✔
25

26

27
class PairInfo(NamedTuple):
1✔
28
    stoploss: float
1✔
29
    winrate: float
1✔
30
    risk_reward_ratio: float
1✔
31
    required_risk_reward: float
1✔
32
    expectancy: float
1✔
33
    nb_trades: int
1✔
34
    avg_trade_duration: float
1✔
35

36

37
class Edge:
1✔
38
    """
39
    Calculates Win Rate, Risk Reward Ratio, Expectancy
40
    against historical data for a give set of markets and a strategy
41
    it then adjusts stoploss and position size accordingly
42
    and force it into the strategy
43
    Author: https://github.com/mishaker
44
    """
45

46
    _cached_pairs: Dict[str, Any] = {}  # Keeps a list of pairs
1✔
47

48
    def __init__(self, config: Config, exchange, strategy) -> None:
1✔
49

50
        self.config = config
1✔
51
        self.exchange = exchange
1✔
52
        self.strategy: IStrategy = strategy
1✔
53

54
        self.edge_config = self.config.get('edge', {})
1✔
55
        self._cached_pairs: Dict[str, Any] = {}  # Keeps a list of pairs
1✔
56
        self._final_pairs: list = []
1✔
57

58
        # checking max_open_trades. it should be -1 as with Edge
59
        # the number of trades is determined by position size
60
        if self.config['max_open_trades'] != float('inf'):
1✔
61
            logger.critical('max_open_trades should be -1 in config !')
1✔
62

63
        if self.config['stake_amount'] != UNLIMITED_STAKE_AMOUNT:
1✔
64
            raise OperationalException('Edge works only with unlimited stake amount')
1✔
65

66
        self._capital_ratio: float = self.config['tradable_balance_ratio']
1✔
67
        self._allowed_risk: float = self.edge_config.get('allowed_risk')
1✔
68
        self._since_number_of_days: int = self.edge_config.get('calculate_since_number_of_days', 14)
1✔
69
        self._last_updated: int = 0  # Timestamp of pairs last updated time
1✔
70
        self._refresh_pairs = True
1✔
71

72
        self._stoploss_range_min = float(self.edge_config.get('stoploss_range_min', -0.01))
1✔
73
        self._stoploss_range_max = float(self.edge_config.get('stoploss_range_max', -0.05))
1✔
74
        self._stoploss_range_step = float(self.edge_config.get('stoploss_range_step', -0.001))
1✔
75

76
        # calculating stoploss range
77
        self._stoploss_range = np.arange(
1✔
78
            self._stoploss_range_min,
79
            self._stoploss_range_max,
80
            self._stoploss_range_step
81
        )
82

83
        self._timerange: TimeRange = TimeRange.parse_timerange(
1✔
84
            f"{(dt_now() - timedelta(days=self._since_number_of_days)).strftime('%Y%m%d')}-")
85
        if config.get('fee'):
1✔
86
            self.fee = config['fee']
1✔
87
        else:
88
            try:
1✔
89
                self.fee = self.exchange.get_fee(symbol=expand_pairlist(
1✔
90
                    self.config['exchange']['pair_whitelist'], list(self.exchange.markets))[0])
91
            except IndexError:
1✔
92
                self.fee = None
1✔
93

94
    def calculate(self, pairs: List[str]) -> bool:
1✔
95
        if self.fee is None and pairs:
1✔
96
            self.fee = self.exchange.get_fee(pairs[0])
1✔
97

98
        heartbeat = self.edge_config.get('process_throttle_secs')
1✔
99

100
        if (self._last_updated > 0) and (
1✔
101
                self._last_updated + heartbeat > int(dt_now().timestamp())):
102
            return False
1✔
103

104
        data: Dict[str, Any] = {}
1✔
105
        logger.info('Using stake_currency: %s ...', self.config['stake_currency'])
1✔
106
        logger.info('Using local backtesting data (using whitelist in given config) ...')
1✔
107

108
        if self._refresh_pairs:
1✔
109
            timerange_startup = deepcopy(self._timerange)
1✔
110
            timerange_startup.subtract_start(timeframe_to_seconds(
1✔
111
                self.strategy.timeframe) * self.strategy.startup_candle_count)
112
            refresh_data(
1✔
113
                datadir=self.config['datadir'],
114
                pairs=pairs,
115
                exchange=self.exchange,
116
                timeframe=self.strategy.timeframe,
117
                timerange=timerange_startup,
118
                data_format=self.config['dataformat_ohlcv'],
119
                candle_type=self.config.get('candle_type_def', CandleType.SPOT),
120
            )
121
            # Download informative pairs too
122
            res = defaultdict(list)
1✔
123
            for pair, timeframe, _ in self.strategy.gather_informative_pairs():
1✔
124
                res[timeframe].append(pair)
×
125
            for timeframe, inf_pairs in res.items():
1✔
126
                timerange_startup = deepcopy(self._timerange)
×
127
                timerange_startup.subtract_start(timeframe_to_seconds(
×
128
                    timeframe) * self.strategy.startup_candle_count)
129
                refresh_data(
×
130
                    datadir=self.config['datadir'],
131
                    pairs=inf_pairs,
132
                    exchange=self.exchange,
133
                    timeframe=timeframe,
134
                    timerange=timerange_startup,
135
                    data_format=self.config['dataformat_ohlcv'],
136
                    candle_type=self.config.get('candle_type_def', CandleType.SPOT),
137
                )
138

139
        data = load_data(
1✔
140
            datadir=self.config['datadir'],
141
            pairs=pairs,
142
            timeframe=self.strategy.timeframe,
143
            timerange=self._timerange,
144
            startup_candles=self.strategy.startup_candle_count,
145
            data_format=self.config['dataformat_ohlcv'],
146
            candle_type=self.config.get('candle_type_def', CandleType.SPOT),
147
        )
148

149
        if not data:
1✔
150
            # Reinitializing cached pairs
151
            self._cached_pairs = {}
1✔
152
            logger.critical("No data found. Edge is stopped ...")
1✔
153
            return False
1✔
154
        # Fake run-mode to Edge
155
        prior_rm = self.config['runmode']
1✔
156
        self.config['runmode'] = RunMode.EDGE
1✔
157
        preprocessed = self.strategy.advise_all_indicators(data)
1✔
158
        self.config['runmode'] = prior_rm
1✔
159

160
        # Print timeframe
161
        min_date, max_date = get_timerange(preprocessed)
1✔
162
        logger.info(f'Measuring data from {min_date.strftime(DATETIME_PRINT_FORMAT)} '
1✔
163
                    f'up to {max_date.strftime(DATETIME_PRINT_FORMAT)} '
164
                    f'({(max_date - min_date).days} days)..')
165
        # TODO: Should edge support shorts? needs to be investigated further
166
        # * (add enter_short exit_short)
167
        headers = ['date', 'open', 'high', 'low', 'close', 'enter_long', 'exit_long']
1✔
168

169
        trades: list = []
1✔
170
        for pair, pair_data in preprocessed.items():
1✔
171
            # Sorting dataframe by date and reset index
172
            pair_data = pair_data.sort_values(by=['date'])
1✔
173
            pair_data = pair_data.reset_index(drop=True)
1✔
174

175
            df_analyzed = self.strategy.ft_advise_signals(pair_data, {'pair': pair})[headers].copy()
1✔
176

177
            trades += self._find_trades_for_stoploss_range(df_analyzed, pair, self._stoploss_range)
1✔
178

179
        # If no trade found then exit
180
        if len(trades) == 0:
1✔
181
            logger.info("No trades found.")
1✔
182
            return False
1✔
183

184
        # Fill missing, calculable columns, profit, duration , abs etc.
185
        trades_df = self._fill_calculable_fields(DataFrame(trades))
1✔
186
        self._cached_pairs = self._process_expectancy(trades_df)
1✔
187
        self._last_updated = int(dt_now().timestamp())
1✔
188

189
        return True
1✔
190

191
    def stake_amount(self, pair: str, free_capital: float,
1✔
192
                     total_capital: float, capital_in_trade: float) -> float:
193
        stoploss = self.get_stoploss(pair)
1✔
194
        available_capital = (total_capital + capital_in_trade) * self._capital_ratio
1✔
195
        allowed_capital_at_risk = available_capital * self._allowed_risk
1✔
196
        max_position_size = abs(allowed_capital_at_risk / stoploss)
1✔
197
        # Position size must be below available capital.
198
        position_size = min(min(max_position_size, free_capital), available_capital)
1✔
199
        if pair in self._cached_pairs:
1✔
200
            logger.info(
1✔
201
                'winrate: %s, expectancy: %s, position size: %s, pair: %s,'
202
                ' capital in trade: %s, free capital: %s, total capital: %s,'
203
                ' stoploss: %s, available capital: %s.',
204
                self._cached_pairs[pair].winrate,
205
                self._cached_pairs[pair].expectancy,
206
                position_size, pair,
207
                capital_in_trade, free_capital, total_capital,
208
                stoploss, available_capital
209
            )
210
        return round(position_size, 15)
1✔
211

212
    def get_stoploss(self, pair: str) -> float:
1✔
213
        if pair in self._cached_pairs:
1✔
214
            return self._cached_pairs[pair].stoploss
1✔
215
        else:
216
            logger.warning(f'Tried to access stoploss of non-existing pair {pair}, '
1✔
217
                           'strategy stoploss is returned instead.')
218
            return self.strategy.stoploss
1✔
219

220
    def adjust(self, pairs: List[str]) -> list:
1✔
221
        """
222
        Filters out and sorts "pairs" according to Edge calculated pairs
223
        """
224
        final = []
1✔
225
        for pair, info in self._cached_pairs.items():
1✔
226
            if (
1✔
227
                info.expectancy > float(self.edge_config.get('minimum_expectancy', 0.2))
228
                and info.winrate > float(self.edge_config.get('minimum_winrate', 0.60))
229
                and pair in pairs
230
            ):
231
                final.append(pair)
1✔
232

233
        if self._final_pairs != final:
1✔
234
            self._final_pairs = final
1✔
235
            if self._final_pairs:
1✔
236
                logger.info(
1✔
237
                    'Minimum expectancy and minimum winrate are met only for %s,'
238
                    ' so other pairs are filtered out.',
239
                    self._final_pairs
240
                )
241
            else:
242
                logger.info(
×
243
                    'Edge removed all pairs as no pair with minimum expectancy '
244
                    'and minimum winrate was found !'
245
                )
246

247
        return self._final_pairs
1✔
248

249
    def accepted_pairs(self) -> List[Dict[str, Any]]:
1✔
250
        """
251
        return a list of accepted pairs along with their winrate, expectancy and stoploss
252
        """
253
        final = []
1✔
254
        for pair, info in self._cached_pairs.items():
1✔
255
            if (info.expectancy > float(self.edge_config.get('minimum_expectancy', 0.2)) and
1✔
256
                    info.winrate > float(self.edge_config.get('minimum_winrate', 0.60))):
257
                final.append({
1✔
258
                    'Pair': pair,
259
                    'Winrate': info.winrate,
260
                    'Expectancy': info.expectancy,
261
                    'Stoploss': info.stoploss,
262
                })
263
        return final
1✔
264

265
    def _fill_calculable_fields(self, result: DataFrame) -> DataFrame:
1✔
266
        """
267
        The result frame contains a number of columns that are calculable
268
        from other columns. These are left blank till all rows are added,
269
        to be populated in single vector calls.
270

271
        Columns to be populated are:
272
        - Profit
273
        - trade duration
274
        - profit abs
275
        :param result Dataframe
276
        :return: result Dataframe
277
        """
278
        # We set stake amount to an arbitrary amount, as it doesn't change the calculation.
279
        # All returned values are relative, they are defined as ratios.
280
        stake = 0.015
1✔
281

282
        result['trade_duration'] = result['close_date'] - result['open_date']
1✔
283

284
        result['trade_duration'] = result['trade_duration'].map(
1✔
285
            lambda x: int(x.total_seconds() / 60))
286

287
        # Spends, Takes, Profit, Absolute Profit
288

289
        # Buy Price
290
        result['buy_vol'] = stake / result['open_rate']  # How many target are we buying
1✔
291
        result['buy_fee'] = stake * self.fee
1✔
292
        result['buy_spend'] = stake + result['buy_fee']  # How much we're spending
1✔
293

294
        # Sell price
295
        result['sell_sum'] = result['buy_vol'] * result['close_rate']
1✔
296
        result['sell_fee'] = result['sell_sum'] * self.fee
1✔
297
        result['sell_take'] = result['sell_sum'] - result['sell_fee']
1✔
298

299
        # profit_ratio
300
        result['profit_ratio'] = (result['sell_take'] - result['buy_spend']) / result['buy_spend']
1✔
301

302
        # Absolute profit
303
        result['profit_abs'] = result['sell_take'] - result['buy_spend']
1✔
304

305
        return result
1✔
306

307
    def _process_expectancy(self, results: DataFrame) -> Dict[str, Any]:
1✔
308
        """
309
        This calculates WinRate, Required Risk Reward, Risk Reward and Expectancy of all pairs
310
        The calculation will be done per pair and per strategy.
311
        """
312
        # Removing pairs having less than min_trades_number
313
        min_trades_number = self.edge_config.get('min_trade_number', 10)
1✔
314
        results = results.groupby(['pair', 'stoploss']).filter(lambda x: len(x) > min_trades_number)
1✔
315
        ###################################
316

317
        # Removing outliers (Only Pumps) from the dataset
318
        # The method to detect outliers is to calculate standard deviation
319
        # Then every value more than (standard deviation + 2*average) is out (pump)
320
        #
321
        # Removing Pumps
322
        if self.edge_config.get('remove_pumps', False):
1✔
323
            results = results[results['profit_abs'] < 2 * results['profit_abs'].std()
1✔
324
                              + results['profit_abs'].mean()]
325
        ##########################################################################
326

327
        # Removing trades having a duration more than X minutes (set in config)
328
        max_trade_duration = self.edge_config.get('max_trade_duration_minute', 1440)
1✔
329
        results = results[results.trade_duration < max_trade_duration]
1✔
330
        #######################################################################
331

332
        if results.empty:
1✔
333
            return {}
1✔
334

335
        groupby_aggregator = {
1✔
336
            'profit_abs': [
337
                ('nb_trades', 'count'),  # number of all trades
338
                ('profit_sum', lambda x: x[x > 0].sum()),  # cumulative profit of all winning trades
339
                ('loss_sum', lambda x: abs(x[x < 0].sum())),  # cumulative loss of all losing trades
340
                ('nb_win_trades', lambda x: x[x > 0].count())  # number of winning trades
341
            ],
342
            'trade_duration': [('avg_trade_duration', 'mean')]
343
        }
344

345
        # Group by (pair and stoploss) by applying above aggregator
346
        df = results.groupby(['pair', 'stoploss'])[['profit_abs', 'trade_duration']].agg(
1✔
347
            groupby_aggregator).reset_index(col_level=1)
348

349
        # Dropping level 0 as we don't need it
350
        df.columns = df.columns.droplevel(0)
1✔
351

352
        # Calculating number of losing trades, average win and average loss
353
        df['nb_loss_trades'] = df['nb_trades'] - df['nb_win_trades']
1✔
354
        df['average_win'] = np.where(df['nb_win_trades'] == 0, 0.0,
1✔
355
                                     df['profit_sum'] / df['nb_win_trades'])
356
        df['average_loss'] = np.where(df['nb_loss_trades'] == 0, 0.0,
1✔
357
                                      df['loss_sum'] / df['nb_loss_trades'])
358

359
        # Win rate = number of profitable trades / number of trades
360
        df['winrate'] = df['nb_win_trades'] / df['nb_trades']
1✔
361

362
        # risk_reward_ratio = average win / average loss
363
        df['risk_reward_ratio'] = df['average_win'] / df['average_loss']
1✔
364

365
        # required_risk_reward = (1 / winrate) - 1
366
        df['required_risk_reward'] = (1 / df['winrate']) - 1
1✔
367

368
        # expectancy = (risk_reward_ratio * winrate) - (lossrate)
369
        df['expectancy'] = (df['risk_reward_ratio'] * df['winrate']) - (1 - df['winrate'])
1✔
370

371
        # sort by expectancy and stoploss
372
        df = df.sort_values(by=['expectancy', 'stoploss'], ascending=False).groupby(
1✔
373
            'pair').first().sort_values(by=['expectancy'], ascending=False).reset_index()
374

375
        final = {}
1✔
376
        for x in df.itertuples():
1✔
377
            final[x.pair] = PairInfo(
1✔
378
                x.stoploss,
379
                x.winrate,
380
                x.risk_reward_ratio,
381
                x.required_risk_reward,
382
                x.expectancy,
383
                x.nb_trades,
384
                x.avg_trade_duration
385
            )
386

387
        # Returning a list of pairs in order of "expectancy"
388
        return final
1✔
389

390
    def _find_trades_for_stoploss_range(self, df, pair: str, stoploss_range) -> list:
1✔
391
        buy_column = df['enter_long'].values
1✔
392
        sell_column = df['exit_long'].values
1✔
393
        date_column = df['date'].values
1✔
394
        ohlc_columns = df[['open', 'high', 'low', 'close']].values
1✔
395

396
        result: list = []
1✔
397
        for stoploss in stoploss_range:
1✔
398
            result += self._detect_next_stop_or_sell_point(
1✔
399
                buy_column, sell_column, date_column, ohlc_columns, round(stoploss, 6), pair
400
            )
401

402
        return result
1✔
403

404
    def _detect_next_stop_or_sell_point(self, buy_column, sell_column, date_column,
1✔
405
                                        ohlc_columns, stoploss, pair: str):
406
        """
407
        Iterate through ohlc_columns in order to find the next trade
408
        Next trade opens from the first buy signal noticed to
409
        The sell or stoploss signal after it.
410
        It then cuts OHLC, buy_column, sell_column and date_column.
411
        Cut from (the exit trade index) + 1.
412

413
        Author: https://github.com/mishaker
414
        """
415

416
        result: list = []
1✔
417
        start_point = 0
1✔
418

419
        while True:
1✔
420
            open_trade_index = utf1st.find_1st(buy_column, 1, utf1st.cmp_equal)
1✔
421

422
            # Return empty if we don't find trade entry (i.e. buy==1) or
423
            # we find a buy but at the end of array
424
            if open_trade_index == -1 or open_trade_index == len(buy_column) - 1:
1✔
425
                break
1✔
426
            else:
427
                # When a buy signal is seen,
428
                # trade opens in reality on the next candle
429
                open_trade_index += 1
1✔
430

431
            open_price = ohlc_columns[open_trade_index, 0]
1✔
432
            stop_price = (open_price * (stoploss + 1))
1✔
433

434
            # Searching for the index where stoploss is hit
435
            stop_index = utf1st.find_1st(
1✔
436
                ohlc_columns[open_trade_index:, 2], stop_price, utf1st.cmp_smaller)
437

438
            # If we don't find it then we assume stop_index will be far in future (infinite number)
439
            if stop_index == -1:
1✔
440
                stop_index = float('inf')
1✔
441

442
            # Searching for the index where sell is hit
443
            sell_index = utf1st.find_1st(sell_column[open_trade_index:], 1, utf1st.cmp_equal)
1✔
444

445
            # If we don't find it then we assume sell_index will be far in future (infinite number)
446
            if sell_index == -1:
1✔
447
                sell_index = float('inf')
1✔
448

449
            # Check if we don't find any stop or sell point (in that case trade remains open)
450
            # It is not interesting for Edge to consider it so we simply ignore the trade
451
            # And stop iterating there is no more entry
452
            if stop_index == sell_index == float('inf'):
1✔
453
                break
×
454

455
            if stop_index <= sell_index:
1✔
456
                exit_index = open_trade_index + stop_index
1✔
457
                exit_type = ExitType.STOP_LOSS
1✔
458
                exit_price = stop_price
1✔
459
            elif stop_index > sell_index:
1✔
460
                # If exit is SELL then we exit at the next candle
461
                exit_index = open_trade_index + sell_index + 1
1✔
462

463
                # Check if we have the next candle
464
                if len(ohlc_columns) - 1 < exit_index:
1✔
465
                    break
1✔
466

467
                exit_type = ExitType.EXIT_SIGNAL
1✔
468
                exit_price = ohlc_columns[exit_index, 0]
1✔
469

470
            trade = {'pair': pair,
1✔
471
                     'stoploss': stoploss,
472
                     'profit_ratio': '',
473
                     'profit_abs': '',
474
                     'open_date': date_column[open_trade_index],
475
                     'close_date': date_column[exit_index],
476
                     'trade_duration': '',
477
                     'open_rate': round(open_price, 15),
478
                     'close_rate': round(exit_price, 15),
479
                     'exit_type': exit_type
480
                     }
481

482
            result.append(trade)
1✔
483

484
            # Giving a view of exit_index till the end of array
485
            buy_column = buy_column[exit_index:]
1✔
486
            sell_column = sell_column[exit_index:]
1✔
487
            date_column = date_column[exit_index:]
1✔
488
            ohlc_columns = ohlc_columns[exit_index:]
1✔
489
            start_point += exit_index
1✔
490

491
        return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc