• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.41
/freqtrade/data/entryexitanalysis.py
1
import logging
1✔
2
from pathlib import Path
1✔
3
from typing import List
1✔
4

5
import joblib
1✔
6
import pandas as pd
1✔
7
from tabulate import tabulate
1✔
8

9
from freqtrade.configuration import TimeRange
1✔
10
from freqtrade.constants import Config
1✔
11
from freqtrade.data.btanalysis import (get_latest_backtest_filename, load_backtest_data,
1✔
12
                                       load_backtest_stats)
13
from freqtrade.exceptions import OperationalException
1✔
14

15

16
logger = logging.getLogger(__name__)
1✔
17

18

19
def _load_backtest_analysis_data(backtest_dir: Path, name: str):
1✔
20
    if backtest_dir.is_dir():
1✔
21
        scpf = Path(backtest_dir,
1✔
22
                    Path(get_latest_backtest_filename(backtest_dir)).stem + "_" + name + ".pkl"
23
                    )
24
    else:
25
        scpf = Path(backtest_dir.parent / f"{backtest_dir.stem}_{name}.pkl")
×
26

27
    try:
1✔
28
        with scpf.open("rb") as scp:
1✔
29
            loaded_data = joblib.load(scp)
1✔
30
            logger.info(f"Loaded {name} candles: {str(scpf)}")
1✔
31
    except Exception as e:
×
32
        logger.error(f"Cannot load {name} data from pickled results: ", e)
×
33
        return None
×
34

35
    return loaded_data
1✔
36

37

38
def _load_rejected_signals(backtest_dir: Path):
1✔
39
    return _load_backtest_analysis_data(backtest_dir, "rejected")
1✔
40

41

42
def _load_signal_candles(backtest_dir: Path):
1✔
43
    return _load_backtest_analysis_data(backtest_dir, "signals")
1✔
44

45

46
def _process_candles_and_indicators(pairlist, strategy_name, trades, signal_candles):
1✔
47
    analysed_trades_dict = {}
1✔
48
    analysed_trades_dict[strategy_name] = {}
1✔
49

50
    try:
1✔
51
        logger.info(f"Processing {strategy_name} : {len(pairlist)} pairs")
1✔
52

53
        for pair in pairlist:
1✔
54
            if pair in signal_candles[strategy_name]:
1✔
55
                analysed_trades_dict[strategy_name][pair] = _analyze_candles_and_indicators(
1✔
56
                    pair, trades, signal_candles[strategy_name][pair])
57
    except Exception as e:
×
58
        print(f"Cannot process entry/exit reasons for {strategy_name}: ", e)
×
59

60
    return analysed_trades_dict
1✔
61

62

63
def _analyze_candles_and_indicators(pair, trades: pd.DataFrame, signal_candles: pd.DataFrame):
1✔
64
    buyf = signal_candles
1✔
65

66
    if len(buyf) > 0:
1✔
67
        buyf = buyf.set_index('date', drop=False)
1✔
68
        trades_red = trades.loc[trades['pair'] == pair].copy()
1✔
69

70
        trades_inds = pd.DataFrame()
1✔
71

72
        if trades_red.shape[0] > 0 and buyf.shape[0] > 0:
1✔
73
            for t, v in trades_red.open_date.items():
1✔
74
                allinds = buyf.loc[(buyf['date'] < v)]
1✔
75
                if allinds.shape[0] > 0:
1✔
76
                    tmp_inds = allinds.iloc[[-1]]
1✔
77

78
                    trades_red.loc[t, 'signal_date'] = tmp_inds['date'].values[0]
1✔
79
                    trades_red.loc[t, 'enter_reason'] = trades_red.loc[t, 'enter_tag']
1✔
80
                    tmp_inds.index.rename('signal_date', inplace=True)
1✔
81
                    trades_inds = pd.concat([trades_inds, tmp_inds])
1✔
82

83
            if 'signal_date' in trades_red:
1✔
84
                trades_red['signal_date'] = pd.to_datetime(trades_red['signal_date'], utc=True)
1✔
85
                trades_red.set_index('signal_date', inplace=True)
1✔
86

87
                try:
1✔
88
                    trades_red = pd.merge(trades_red, trades_inds, on='signal_date', how='outer')
1✔
89
                except Exception as e:
×
90
                    raise e
×
91
        return trades_red
1✔
92
    else:
93
        return pd.DataFrame()
×
94

95

96
def _do_group_table_output(bigdf, glist, csv_path: Path, to_csv=False, ):
1✔
97
    for g in glist:
1✔
98
        # 0: summary wins/losses grouped by enter tag
99
        if g == "0":
1✔
100
            group_mask = ['enter_reason']
1✔
101
            wins = bigdf.loc[bigdf['profit_abs'] >= 0] \
1✔
102
                        .groupby(group_mask) \
103
                        .agg({'profit_abs': ['sum']})
104

105
            wins.columns = ['profit_abs_wins']
1✔
106
            loss = bigdf.loc[bigdf['profit_abs'] < 0] \
1✔
107
                        .groupby(group_mask) \
108
                        .agg({'profit_abs': ['sum']})
109
            loss.columns = ['profit_abs_loss']
1✔
110

111
            new = bigdf.groupby(group_mask).agg({'profit_abs': [
1✔
112
                                                    'count',
113
                                                    lambda x: sum(x > 0),
114
                                                    lambda x: sum(x <= 0)]})
115
            new = pd.concat([new, wins, loss], axis=1).fillna(0)
1✔
116

117
            new['profit_tot'] = new['profit_abs_wins'] - abs(new['profit_abs_loss'])
1✔
118
            new['wl_ratio_pct'] = (new.iloc[:, 1] / new.iloc[:, 0] * 100).fillna(0)
1✔
119
            new['avg_win'] = (new['profit_abs_wins'] / new.iloc[:, 1]).fillna(0)
1✔
120
            new['avg_loss'] = (new['profit_abs_loss'] / new.iloc[:, 2]).fillna(0)
1✔
121

122
            new['exp_ratio'] = (
1✔
123
                (
124
                    (1 + (new['avg_win'] / abs(new['avg_loss']))) * (new['wl_ratio_pct'] / 100)
125
                ) - 1).fillna(0)
126

127
            new.columns = ['total_num_buys', 'wins', 'losses',
1✔
128
                           'profit_abs_wins', 'profit_abs_loss',
129
                           'profit_tot', 'wl_ratio_pct',
130
                           'avg_win', 'avg_loss', 'exp_ratio']
131

132
            sortcols = ['total_num_buys']
1✔
133

134
            _print_table(new, sortcols, show_index=True, name="Group 0:",
1✔
135
                         to_csv=to_csv, csv_path=csv_path)
136

137
        else:
138
            agg_mask = {'profit_abs': ['count', 'sum', 'median', 'mean'],
1✔
139
                        'profit_ratio': ['median', 'mean', 'sum']}
140
            agg_cols = ['num_buys', 'profit_abs_sum', 'profit_abs_median',
1✔
141
                        'profit_abs_mean', 'median_profit_pct', 'mean_profit_pct',
142
                        'total_profit_pct']
143
            sortcols = ['profit_abs_sum', 'enter_reason']
1✔
144

145
            # 1: profit summaries grouped by enter_tag
146
            if g == "1":
1✔
147
                group_mask = ['enter_reason']
1✔
148

149
            # 2: profit summaries grouped by enter_tag and exit_tag
150
            if g == "2":
1✔
151
                group_mask = ['enter_reason', 'exit_reason']
1✔
152

153
            # 3: profit summaries grouped by pair and enter_tag
154
            if g == "3":
1✔
155
                group_mask = ['pair', 'enter_reason']
1✔
156

157
            # 4: profit summaries grouped by pair, enter_ and exit_tag (this can get quite large)
158
            if g == "4":
1✔
159
                group_mask = ['pair', 'enter_reason', 'exit_reason']
1✔
160

161
            # 5: profit summaries grouped by exit_tag
162
            if g == "5":
1✔
163
                group_mask = ['exit_reason']
1✔
164
                sortcols = ['exit_reason']
1✔
165

166
            if group_mask:
1✔
167
                new = bigdf.groupby(group_mask).agg(agg_mask).reset_index()
1✔
168
                new.columns = group_mask + agg_cols
1✔
169
                new['median_profit_pct'] = new['median_profit_pct'] * 100
1✔
170
                new['mean_profit_pct'] = new['mean_profit_pct'] * 100
1✔
171
                new['total_profit_pct'] = new['total_profit_pct'] * 100
1✔
172

173
                _print_table(new, sortcols, name=f"Group {g}:",
1✔
174
                             to_csv=to_csv, csv_path=csv_path)
175
            else:
176
                logger.warning("Invalid group mask specified.")
×
177

178

179
def _do_rejected_signals_output(rejected_signals_df: pd.DataFrame,
1✔
180
                                to_csv: bool = False, csv_path=None) -> None:
181
    cols = ['pair', 'date', 'enter_tag']
×
182
    sortcols = ['date', 'pair', 'enter_tag']
×
183
    _print_table(rejected_signals_df[cols],
×
184
                 sortcols,
185
                 show_index=False,
186
                 name="Rejected Signals:",
187
                 to_csv=to_csv,
188
                 csv_path=csv_path)
189

190

191
def _select_rows_within_dates(df, timerange=None, df_date_col: str = 'date'):
1✔
192
    if timerange:
1✔
193
        if timerange.starttype == 'date':
1✔
194
            df = df.loc[(df[df_date_col] >= timerange.startdt)]
1✔
195
        if timerange.stoptype == 'date':
1✔
196
            df = df.loc[(df[df_date_col] < timerange.stopdt)]
1✔
197
    return df
1✔
198

199

200
def _select_rows_by_tags(df, enter_reason_list, exit_reason_list):
1✔
201
    if enter_reason_list and "all" not in enter_reason_list:
1✔
202
        df = df.loc[(df['enter_reason'].isin(enter_reason_list))]
×
203

204
    if exit_reason_list and "all" not in exit_reason_list:
1✔
205
        df = df.loc[(df['exit_reason'].isin(exit_reason_list))]
×
206
    return df
1✔
207

208

209
def prepare_results(analysed_trades, stratname,
1✔
210
                    enter_reason_list, exit_reason_list,
211
                    timerange=None):
212
    res_df = pd.DataFrame()
1✔
213
    for pair, trades in analysed_trades[stratname].items():
1✔
214
        if (trades.shape[0] > 0):
1✔
215
            trades.dropna(subset=['close_date'], inplace=True)
1✔
216
            res_df = pd.concat([res_df, trades], ignore_index=True)
1✔
217

218
    res_df = _select_rows_within_dates(res_df, timerange)
1✔
219

220
    if res_df is not None and res_df.shape[0] > 0 and ('enter_reason' in res_df.columns):
1✔
221
        res_df = _select_rows_by_tags(res_df, enter_reason_list, exit_reason_list)
1✔
222

223
    return res_df
1✔
224

225

226
def print_results(res_df: pd.DataFrame, analysis_groups: List[str], indicator_list: List[str],
1✔
227
                  csv_path: Path, rejected_signals=None, to_csv=False):
228
    if res_df.shape[0] > 0:
1✔
229
        if analysis_groups:
1✔
230
            _do_group_table_output(res_df, analysis_groups, to_csv=to_csv, csv_path=csv_path)
1✔
231

232
        if rejected_signals is not None:
1✔
233
            if rejected_signals.empty:
1✔
234
                print("There were no rejected signals.")
1✔
235
            else:
236
                _do_rejected_signals_output(rejected_signals, to_csv=to_csv, csv_path=csv_path)
×
237

238
        # NB this can be large for big dataframes!
239
        if "all" in indicator_list:
1✔
240
            _print_table(res_df,
×
241
                         show_index=False,
242
                         name="Indicators:",
243
                         to_csv=to_csv,
244
                         csv_path=csv_path)
245
        elif indicator_list is not None and indicator_list:
1✔
246
            available_inds = []
1✔
247
            for ind in indicator_list:
1✔
248
                if ind in res_df:
1✔
249
                    available_inds.append(ind)
1✔
250
            ilist = ["pair", "enter_reason", "exit_reason"] + available_inds
1✔
251
            _print_table(res_df[ilist],
1✔
252
                         sortcols=['exit_reason'],
253
                         show_index=False,
254
                         name="Indicators:",
255
                         to_csv=to_csv,
256
                         csv_path=csv_path)
257
    else:
258
        print("\\No trades to show")
×
259

260

261
def _print_table(df: pd.DataFrame, sortcols=None, *, show_index=False, name=None,
1✔
262
                 to_csv=False, csv_path: Path):
263
    if (sortcols is not None):
1✔
264
        data = df.sort_values(sortcols)
1✔
265
    else:
266
        data = df
×
267

268
    if to_csv:
1✔
269
        safe_name = Path(csv_path, name.lower().replace(" ", "_").replace(":", "") + ".csv")
×
270
        data.to_csv(safe_name)
×
271
        print(f"Saved {name} to {safe_name}")
×
272
    else:
273
        if name is not None:
1✔
274
            print(name)
1✔
275

276
        print(
1✔
277
            tabulate(
278
                data,
279
                headers='keys',
280
                tablefmt='psql',
281
                showindex=show_index
282
            )
283
        )
284

285

286
def process_entry_exit_reasons(config: Config):
1✔
287
    try:
1✔
288
        analysis_groups = config.get('analysis_groups', [])
1✔
289
        enter_reason_list = config.get('enter_reason_list', ["all"])
1✔
290
        exit_reason_list = config.get('exit_reason_list', ["all"])
1✔
291
        indicator_list = config.get('indicator_list', [])
1✔
292
        do_rejected = config.get('analysis_rejected', False)
1✔
293
        to_csv = config.get('analysis_to_csv', False)
1✔
294
        csv_path = Path(config.get('analysis_csv_path', config['exportfilename']))
1✔
295
        if to_csv and not csv_path.is_dir():
1✔
296
            raise OperationalException(f"Specified directory {csv_path} does not exist.")
×
297

298
        timerange = TimeRange.parse_timerange(None if config.get(
1✔
299
            'timerange') is None else str(config.get('timerange')))
300

301
        backtest_stats = load_backtest_stats(config['exportfilename'])
1✔
302

303
        for strategy_name, results in backtest_stats['strategy'].items():
1✔
304
            trades = load_backtest_data(config['exportfilename'], strategy_name)
1✔
305

306
            if trades is not None and not trades.empty:
1✔
307
                signal_candles = _load_signal_candles(config['exportfilename'])
1✔
308

309
                rej_df = None
1✔
310
                if do_rejected:
1✔
311
                    rejected_signals_dict = _load_rejected_signals(config['exportfilename'])
1✔
312
                    rej_df = prepare_results(rejected_signals_dict, strategy_name,
1✔
313
                                             enter_reason_list, exit_reason_list,
314
                                             timerange=timerange)
315

316
                analysed_trades_dict = _process_candles_and_indicators(
1✔
317
                                        config['exchange']['pair_whitelist'], strategy_name,
318
                                        trades, signal_candles)
319

320
                res_df = prepare_results(analysed_trades_dict, strategy_name,
1✔
321
                                         enter_reason_list, exit_reason_list,
322
                                         timerange=timerange)
323

324
                print_results(res_df,
1✔
325
                              analysis_groups,
326
                              indicator_list,
327
                              rejected_signals=rej_df,
328
                              to_csv=to_csv,
329
                              csv_path=csv_path)
330

331
    except ValueError as e:
×
332
        raise OperationalException(e) from e
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc