• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 4131167254

pending completion
4131167254

push

github-actions

GitHub
Merge pull request #7983 from stash86/bt-metrics

16866 of 17748 relevant lines covered (95.03%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.26
/freqtrade/data/entryexitanalysis.py
1
import logging
1✔
2
from pathlib import Path
1✔
3

4
import joblib
1✔
5
import pandas as pd
1✔
6
from tabulate import tabulate
1✔
7

8
from freqtrade.configuration import TimeRange
1✔
9
from freqtrade.constants import Config
1✔
10
from freqtrade.data.btanalysis import (get_latest_backtest_filename, load_backtest_data,
1✔
11
                                       load_backtest_stats)
12
from freqtrade.exceptions import OperationalException
1✔
13

14

15
logger = logging.getLogger(__name__)
1✔
16

17

18
def _load_signal_candles(backtest_dir: Path):
1✔
19
    if backtest_dir.is_dir():
1✔
20
        scpf = Path(backtest_dir,
1✔
21
                    Path(get_latest_backtest_filename(backtest_dir)).stem + "_signals.pkl"
22
                    )
23
    else:
24
        scpf = Path(backtest_dir.parent / f"{backtest_dir.stem}_signals.pkl")
×
25

26
    try:
1✔
27
        scp = open(scpf, "rb")
1✔
28
        signal_candles = joblib.load(scp)
1✔
29
        logger.info(f"Loaded signal candles: {str(scpf)}")
1✔
30
    except Exception as e:
×
31
        logger.error("Cannot load signal candles from pickled results: ", e)
×
32

33
    return signal_candles
1✔
34

35

36
def _process_candles_and_indicators(pairlist, strategy_name, trades, signal_candles):
1✔
37
    analysed_trades_dict = {}
1✔
38
    analysed_trades_dict[strategy_name] = {}
1✔
39

40
    try:
1✔
41
        logger.info(f"Processing {strategy_name} : {len(pairlist)} pairs")
1✔
42

43
        for pair in pairlist:
1✔
44
            if pair in signal_candles[strategy_name]:
1✔
45
                analysed_trades_dict[strategy_name][pair] = _analyze_candles_and_indicators(
1✔
46
                                                              pair,
47
                                                              trades,
48
                                                              signal_candles[strategy_name][pair])
49
    except Exception as e:
×
50
        print(f"Cannot process entry/exit reasons for {strategy_name}: ", e)
×
51

52
    return analysed_trades_dict
1✔
53

54

55
def _analyze_candles_and_indicators(pair, trades: pd.DataFrame, signal_candles: pd.DataFrame):
1✔
56
    buyf = signal_candles
1✔
57

58
    if len(buyf) > 0:
1✔
59
        buyf = buyf.set_index('date', drop=False)
1✔
60
        trades_red = trades.loc[trades['pair'] == pair].copy()
1✔
61

62
        trades_inds = pd.DataFrame()
1✔
63

64
        if trades_red.shape[0] > 0 and buyf.shape[0] > 0:
1✔
65
            for t, v in trades_red.open_date.items():
1✔
66
                allinds = buyf.loc[(buyf['date'] < v)]
1✔
67
                if allinds.shape[0] > 0:
1✔
68
                    tmp_inds = allinds.iloc[[-1]]
1✔
69

70
                    trades_red.loc[t, 'signal_date'] = tmp_inds['date'].values[0]
1✔
71
                    trades_red.loc[t, 'enter_reason'] = trades_red.loc[t, 'enter_tag']
1✔
72
                    tmp_inds.index.rename('signal_date', inplace=True)
1✔
73
                    trades_inds = pd.concat([trades_inds, tmp_inds])
1✔
74

75
            if 'signal_date' in trades_red:
1✔
76
                trades_red['signal_date'] = pd.to_datetime(trades_red['signal_date'], utc=True)
1✔
77
                trades_red.set_index('signal_date', inplace=True)
1✔
78

79
                try:
1✔
80
                    trades_red = pd.merge(trades_red, trades_inds, on='signal_date', how='outer')
1✔
81
                except Exception as e:
×
82
                    raise e
×
83
        return trades_red
1✔
84
    else:
85
        return pd.DataFrame()
×
86

87

88
def _do_group_table_output(bigdf, glist):
1✔
89
    for g in glist:
1✔
90
        # 0: summary wins/losses grouped by enter tag
91
        if g == "0":
1✔
92
            group_mask = ['enter_reason']
1✔
93
            wins = bigdf.loc[bigdf['profit_abs'] >= 0] \
1✔
94
                        .groupby(group_mask) \
95
                        .agg({'profit_abs': ['sum']})
96

97
            wins.columns = ['profit_abs_wins']
1✔
98
            loss = bigdf.loc[bigdf['profit_abs'] < 0] \
1✔
99
                        .groupby(group_mask) \
100
                        .agg({'profit_abs': ['sum']})
101
            loss.columns = ['profit_abs_loss']
1✔
102

103
            new = bigdf.groupby(group_mask).agg({'profit_abs': [
1✔
104
                                                    'count',
105
                                                    lambda x: sum(x > 0),
106
                                                    lambda x: sum(x <= 0)]})
107
            new = pd.concat([new, wins, loss], axis=1).fillna(0)
1✔
108

109
            new['profit_tot'] = new['profit_abs_wins'] - abs(new['profit_abs_loss'])
1✔
110
            new['wl_ratio_pct'] = (new.iloc[:, 1] / new.iloc[:, 0] * 100).fillna(0)
1✔
111
            new['avg_win'] = (new['profit_abs_wins'] / new.iloc[:, 1]).fillna(0)
1✔
112
            new['avg_loss'] = (new['profit_abs_loss'] / new.iloc[:, 2]).fillna(0)
1✔
113

114
            new.columns = ['total_num_buys', 'wins', 'losses', 'profit_abs_wins', 'profit_abs_loss',
1✔
115
                           'profit_tot', 'wl_ratio_pct', 'avg_win', 'avg_loss']
116

117
            sortcols = ['total_num_buys']
1✔
118

119
            _print_table(new, sortcols, show_index=True)
1✔
120

121
        else:
122
            agg_mask = {'profit_abs': ['count', 'sum', 'median', 'mean'],
1✔
123
                        'profit_ratio': ['median', 'mean', 'sum']}
124
            agg_cols = ['num_buys', 'profit_abs_sum', 'profit_abs_median',
1✔
125
                        'profit_abs_mean', 'median_profit_pct', 'mean_profit_pct',
126
                        'total_profit_pct']
127
            sortcols = ['profit_abs_sum', 'enter_reason']
1✔
128

129
            # 1: profit summaries grouped by enter_tag
130
            if g == "1":
1✔
131
                group_mask = ['enter_reason']
1✔
132

133
            # 2: profit summaries grouped by enter_tag and exit_tag
134
            if g == "2":
1✔
135
                group_mask = ['enter_reason', 'exit_reason']
1✔
136

137
            # 3: profit summaries grouped by pair and enter_tag
138
            if g == "3":
1✔
139
                group_mask = ['pair', 'enter_reason']
1✔
140

141
            # 4: profit summaries grouped by pair, enter_ and exit_tag (this can get quite large)
142
            if g == "4":
1✔
143
                group_mask = ['pair', 'enter_reason', 'exit_reason']
1✔
144
            if group_mask:
1✔
145
                new = bigdf.groupby(group_mask).agg(agg_mask).reset_index()
1✔
146
                new.columns = group_mask + agg_cols
1✔
147
                new['median_profit_pct'] = new['median_profit_pct'] * 100
1✔
148
                new['mean_profit_pct'] = new['mean_profit_pct'] * 100
1✔
149
                new['total_profit_pct'] = new['total_profit_pct'] * 100
1✔
150

151
                _print_table(new, sortcols)
1✔
152
            else:
153
                logger.warning("Invalid group mask specified.")
×
154

155

156
def _select_rows_within_dates(df, timerange=None, df_date_col: str = 'date'):
1✔
157
    if timerange:
1✔
158
        if timerange.starttype == 'date':
1✔
159
            df = df.loc[(df[df_date_col] >= timerange.startdt)]
1✔
160
        if timerange.stoptype == 'date':
1✔
161
            df = df.loc[(df[df_date_col] < timerange.stopdt)]
1✔
162
    return df
1✔
163

164

165
def _select_rows_by_tags(df, enter_reason_list, exit_reason_list):
1✔
166
    if enter_reason_list and "all" not in enter_reason_list:
1✔
167
        df = df.loc[(df['enter_reason'].isin(enter_reason_list))]
×
168

169
    if exit_reason_list and "all" not in exit_reason_list:
1✔
170
        df = df.loc[(df['exit_reason'].isin(exit_reason_list))]
×
171
    return df
1✔
172

173

174
def prepare_results(analysed_trades, stratname,
1✔
175
                    enter_reason_list, exit_reason_list,
176
                    timerange=None):
177
    res_df = pd.DataFrame()
1✔
178
    for pair, trades in analysed_trades[stratname].items():
1✔
179
        res_df = pd.concat([res_df, trades], ignore_index=True)
1✔
180

181
    res_df = _select_rows_within_dates(res_df, timerange)
1✔
182

183
    if res_df is not None and res_df.shape[0] > 0 and ('enter_reason' in res_df.columns):
1✔
184
        res_df = _select_rows_by_tags(res_df, enter_reason_list, exit_reason_list)
1✔
185

186
    return res_df
1✔
187

188

189
def print_results(res_df, analysis_groups, indicator_list):
1✔
190
    if res_df.shape[0] > 0:
1✔
191
        if analysis_groups:
1✔
192
            _do_group_table_output(res_df, analysis_groups)
1✔
193

194
        if "all" in indicator_list:
1✔
195
            print(res_df)
×
196
        elif indicator_list is not None:
1✔
197
            available_inds = []
1✔
198
            for ind in indicator_list:
1✔
199
                if ind in res_df:
1✔
200
                    available_inds.append(ind)
1✔
201
            ilist = ["pair", "enter_reason", "exit_reason"] + available_inds
1✔
202
            _print_table(res_df[ilist], sortcols=['exit_reason'], show_index=False)
1✔
203
    else:
204
        print("\\No trades to show")
×
205

206

207
def _print_table(df, sortcols=None, show_index=False):
1✔
208
    if (sortcols is not None):
1✔
209
        data = df.sort_values(sortcols)
1✔
210
    else:
211
        data = df
×
212

213
    print(
1✔
214
        tabulate(
215
            data,
216
            headers='keys',
217
            tablefmt='psql',
218
            showindex=show_index
219
        )
220
    )
221

222

223
def process_entry_exit_reasons(config: Config):
1✔
224
    try:
1✔
225
        analysis_groups = config.get('analysis_groups', [])
1✔
226
        enter_reason_list = config.get('enter_reason_list', ["all"])
1✔
227
        exit_reason_list = config.get('exit_reason_list', ["all"])
1✔
228
        indicator_list = config.get('indicator_list', [])
1✔
229

230
        timerange = TimeRange.parse_timerange(None if config.get(
1✔
231
            'timerange') is None else str(config.get('timerange')))
232

233
        backtest_stats = load_backtest_stats(config['exportfilename'])
1✔
234

235
        for strategy_name, results in backtest_stats['strategy'].items():
1✔
236
            trades = load_backtest_data(config['exportfilename'], strategy_name)
1✔
237

238
            if not trades.empty:
1✔
239
                signal_candles = _load_signal_candles(config['exportfilename'])
1✔
240
                analysed_trades_dict = _process_candles_and_indicators(
1✔
241
                                        config['exchange']['pair_whitelist'], strategy_name,
242
                                        trades, signal_candles)
243

244
                res_df = prepare_results(analysed_trades_dict, strategy_name,
1✔
245
                                         enter_reason_list, exit_reason_list,
246
                                         timerange=timerange)
247

248
                print_results(res_df,
1✔
249
                              analysis_groups,
250
                              indicator_list)
251

252
    except ValueError as e:
×
253
        raise OperationalException(e) from e
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc