• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 4131167254

pending completion
4131167254

push

github-actions

GitHub
Merge pull request #7983 from stash86/bt-metrics

16866 of 17748 relevant lines covered (95.03%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.55
/freqtrade/freqai/data_kitchen.py
1
import copy
1✔
2
import logging
1✔
3
import shutil
1✔
4
from datetime import datetime, timezone
1✔
5
from math import cos, sin
1✔
6
from pathlib import Path
1✔
7
from typing import Any, Dict, List, Tuple
1✔
8

9
import numpy as np
1✔
10
import numpy.typing as npt
1✔
11
import pandas as pd
1✔
12
import psutil
1✔
13
from pandas import DataFrame
1✔
14
from scipy import stats
1✔
15
from sklearn import linear_model
1✔
16
from sklearn.cluster import DBSCAN
1✔
17
from sklearn.metrics.pairwise import pairwise_distances
1✔
18
from sklearn.model_selection import train_test_split
1✔
19
from sklearn.neighbors import NearestNeighbors
1✔
20

21
from freqtrade.configuration import TimeRange
1✔
22
from freqtrade.constants import Config
1✔
23
from freqtrade.data.converter import reduce_dataframe_footprint
1✔
24
from freqtrade.exceptions import OperationalException
1✔
25
from freqtrade.exchange import timeframe_to_seconds
1✔
26
from freqtrade.strategy.interface import IStrategy
1✔
27

28

29
SECONDS_IN_DAY = 86400
1✔
30
SECONDS_IN_HOUR = 3600
1✔
31

32
logger = logging.getLogger(__name__)
1✔
33

34

35
class FreqaiDataKitchen:
1✔
36
    """
37
    Class designed to analyze data for a single pair. Employed by the IFreqaiModel class.
38
    Functionalities include holding, saving, loading, and analyzing the data.
39

40
    This object is not persistent, it is reinstantiated for each coin, each time the coin
41
    model needs to be inferenced or trained.
42

43
    Record of contribution:
44
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
45
    project.
46

47
    Conception and software development:
48
    Robert Caulk @robcaulk
49

50
    Theoretical brainstorming:
51
    Elin Törnquist @th0rntwig
52

53
    Code review, software architecture brainstorming:
54
    @xmatthias
55

56
    Beta testing and bug reporting:
57
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
58
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
59
    """
60

61
    def __init__(
1✔
62
        self,
63
        config: Config,
64
        live: bool = False,
65
        pair: str = "",
66
    ):
67
        self.data: Dict[str, Any] = {}
1✔
68
        self.data_dictionary: Dict[str, DataFrame] = {}
1✔
69
        self.config = config
1✔
70
        self.freqai_config: Dict[str, Any] = config["freqai"]
1✔
71
        self.full_df: DataFrame = DataFrame()
1✔
72
        self.append_df: DataFrame = DataFrame()
1✔
73
        self.data_path = Path()
1✔
74
        self.label_list: List = []
1✔
75
        self.training_features_list: List = []
1✔
76
        self.model_filename: str = ""
1✔
77
        self.backtesting_results_path = Path()
1✔
78
        self.backtest_predictions_folder: str = "backtesting_predictions"
1✔
79
        self.live = live
1✔
80
        self.pair = pair
1✔
81

82
        self.svm_model: linear_model.SGDOneClassSVM = None
1✔
83
        self.keras: bool = self.freqai_config.get("keras", False)
1✔
84
        self.set_all_pairs()
1✔
85
        self.backtest_live_models = config.get("freqai_backtest_live_models", False)
1✔
86

87
        if not self.live:
1✔
88
            self.full_path = self.get_full_models_path(self.config)
1✔
89

90
            if not self.backtest_live_models:
1✔
91
                self.full_timerange = self.create_fulltimerange(
1✔
92
                    self.config["timerange"], self.freqai_config.get("train_period_days", 0)
93
                )
94
                (self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
1✔
95
                    self.full_timerange,
96
                    config["freqai"]["train_period_days"],
97
                    config["freqai"]["backtest_period_days"],
98
                )
99

100
        self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
1✔
101
        if not self.freqai_config.get("data_kitchen_thread_count", 0):
1✔
102
            self.thread_count = max(int(psutil.cpu_count() * 2 - 2), 1)
×
103
        else:
104
            self.thread_count = self.freqai_config["data_kitchen_thread_count"]
1✔
105
        self.train_dates: DataFrame = pd.DataFrame()
1✔
106
        self.unique_classes: Dict[str, list] = {}
1✔
107
        self.unique_class_list: list = []
1✔
108
        self.backtest_live_models_data: Dict[str, Any] = {}
1✔
109

110
    def set_paths(
1✔
111
        self,
112
        pair: str,
113
        trained_timestamp: int = None,
114
    ) -> None:
115
        """
116
        Set the paths to the data for the present coin/botloop
117
        :param metadata: dict = strategy furnished pair metadata
118
        :param trained_timestamp: int = timestamp of most recent training
119
        """
120
        self.full_path = self.get_full_models_path(self.config)
1✔
121
        self.data_path = Path(
1✔
122
            self.full_path
123
            / f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
124
        )
125

126
        return
1✔
127

128
    def make_train_test_datasets(
1✔
129
        self, filtered_dataframe: DataFrame, labels: DataFrame
130
    ) -> Dict[Any, Any]:
131
        """
132
        Given the dataframe for the full history for training, split the data into
133
        training and test data according to user specified parameters in configuration
134
        file.
135
        :param filtered_dataframe: cleaned dataframe ready to be split.
136
        :param labels: cleaned labels ready to be split.
137
        """
138
        feat_dict = self.freqai_config["feature_parameters"]
1✔
139

140
        if 'shuffle' not in self.freqai_config['data_split_parameters']:
1✔
141
            self.freqai_config["data_split_parameters"].update({'shuffle': False})
×
142

143
        weights: npt.ArrayLike
144
        if feat_dict.get("weight_factor", 0) > 0:
1✔
145
            weights = self.set_weights_higher_recent(len(filtered_dataframe))
1✔
146
        else:
147
            weights = np.ones(len(filtered_dataframe))
×
148

149
        if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
150
            (
1✔
151
                train_features,
152
                test_features,
153
                train_labels,
154
                test_labels,
155
                train_weights,
156
                test_weights,
157
            ) = train_test_split(
158
                filtered_dataframe[: filtered_dataframe.shape[0]],
159
                labels,
160
                weights,
161
                **self.config["freqai"]["data_split_parameters"],
162
            )
163
        else:
164
            test_labels = np.zeros(2)
×
165
            test_features = pd.DataFrame()
×
166
            test_weights = np.zeros(2)
×
167
            train_features = filtered_dataframe
×
168
            train_labels = labels
×
169
            train_weights = weights
×
170

171
        # Simplest way to reverse the order of training and test data:
172
        if self.freqai_config['feature_parameters'].get('reverse_train_test_order', False):
1✔
173
            return self.build_data_dictionary(
×
174
                test_features, train_features, test_labels,
175
                train_labels, test_weights, train_weights
176
                )
177
        else:
178
            return self.build_data_dictionary(
1✔
179
                train_features, test_features, train_labels,
180
                test_labels, train_weights, test_weights
181
            )
182

183
    def filter_features(
1✔
184
        self,
185
        unfiltered_df: DataFrame,
186
        training_feature_list: List,
187
        label_list: List = list(),
188
        training_filter: bool = True,
189
    ) -> Tuple[DataFrame, DataFrame]:
190
        """
191
        Filter the unfiltered dataframe to extract the user requested features/labels and properly
192
        remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
193
        0s in the prediction dataset. However, prediction dataset do_predict will reflect any
194
        row that had a NaN and will shield user from that prediction.
195

196
        :param unfiltered_df: the full dataframe for the present training period
197
        :param training_feature_list: list, the training feature list constructed by
198
                                      self.build_feature_list() according to user specified
199
                                      parameters in the configuration file.
200
        :param labels: the labels for the dataset
201
        :param training_filter: boolean which lets the function know if it is training data or
202
                                prediction data to be filtered.
203
        :returns:
204
        :filtered_df: dataframe cleaned of NaNs and only containing the user
205
        requested feature set.
206
        :labels: labels cleaned of NaNs.
207
        """
208
        filtered_df = unfiltered_df.filter(training_feature_list, axis=1)
1✔
209
        filtered_df = filtered_df.replace([np.inf, -np.inf], np.nan)
1✔
210

211
        drop_index = pd.isnull(filtered_df).any(axis=1)  # get the rows that have NaNs,
1✔
212
        drop_index = drop_index.replace(True, 1).replace(False, 0)  # pep8 requirement.
1✔
213
        if (training_filter):
1✔
214
            const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
1✔
215
            if const_cols:
1✔
216
                filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
1✔
217
                self.data['constant_features_list'] = const_cols
1✔
218
                logger.warning(f"Removed features {const_cols} with constant values.")
1✔
219
            else:
220
                self.data['constant_features_list'] = []
1✔
221
            # we don't care about total row number (total no. datapoints) in training, we only care
222
            # about removing any row with NaNs
223
            # if labels has multiple columns (user wants to train multiple modelEs), we detect here
224
            labels = unfiltered_df.filter(label_list, axis=1)
1✔
225
            drop_index_labels = pd.isnull(labels).any(axis=1)
1✔
226
            drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0)
1✔
227
            dates = unfiltered_df['date']
1✔
228
            filtered_df = filtered_df[
1✔
229
                (drop_index == 0) & (drop_index_labels == 0)
230
            ]  # dropping values
231
            labels = labels[
1✔
232
                (drop_index == 0) & (drop_index_labels == 0)
233
            ]  # assuming the labels depend entirely on the dataframe here.
234
            self.train_dates = dates[
1✔
235
                (drop_index == 0) & (drop_index_labels == 0)
236
            ]
237
            logger.info(
1✔
238
                f"dropped {len(unfiltered_df) - len(filtered_df)} training points"
239
                f" due to NaNs in populated dataset {len(unfiltered_df)}."
240
            )
241
            if (1 - len(filtered_df) / len(unfiltered_df)) > 0.1 and self.live:
1✔
242
                worst_indicator = str(unfiltered_df.count().idxmin())
×
243
                logger.warning(
×
244
                    f" {(1 - len(filtered_df)/len(unfiltered_df)) * 100:.0f} percent "
245
                    " of training data dropped due to NaNs, model may perform inconsistent "
246
                    f"with expectations. Verify {worst_indicator}"
247
                )
248
            self.data["filter_drop_index_training"] = drop_index
1✔
249

250
        else:
251
            if 'constant_features_list' in self.data and len(self.data['constant_features_list']):
1✔
252
                filtered_df = self.check_pred_labels(filtered_df)
1✔
253
            # we are backtesting so we need to preserve row number to send back to strategy,
254
            # so now we use do_predict to avoid any prediction based on a NaN
255
            drop_index = pd.isnull(filtered_df).any(axis=1)
1✔
256
            self.data["filter_drop_index_prediction"] = drop_index
1✔
257
            filtered_df.fillna(0, inplace=True)
1✔
258
            # replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction
259
            # that was based on a single NaN is ultimately protected from buys with do_predict
260
            drop_index = ~drop_index
1✔
261
            self.do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
1✔
262
            if (len(self.do_predict) - self.do_predict.sum()) > 0:
1✔
263
                logger.info(
1✔
264
                    "dropped %s of %s prediction data points due to NaNs.",
265
                    len(self.do_predict) - self.do_predict.sum(),
266
                    len(filtered_df),
267
                )
268
            labels = []
1✔
269

270
        return filtered_df, labels
1✔
271

272
    def build_data_dictionary(
1✔
273
        self,
274
        train_df: DataFrame,
275
        test_df: DataFrame,
276
        train_labels: DataFrame,
277
        test_labels: DataFrame,
278
        train_weights: Any,
279
        test_weights: Any,
280
    ) -> Dict:
281

282
        self.data_dictionary = {
1✔
283
            "train_features": train_df,
284
            "test_features": test_df,
285
            "train_labels": train_labels,
286
            "test_labels": test_labels,
287
            "train_weights": train_weights,
288
            "test_weights": test_weights,
289
            "train_dates": self.train_dates
290
        }
291

292
        return self.data_dictionary
1✔
293

294
    def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
1✔
295
        """
296
        Normalize all data in the data_dictionary according to the training dataset
297
        :param data_dictionary: dictionary containing the cleaned and
298
                                split training/test data/labels
299
        :returns:
300
        :data_dictionary: updated dictionary with standardized values.
301
        """
302

303
        # standardize the data by training stats
304
        train_max = data_dictionary["train_features"].max()
1✔
305
        train_min = data_dictionary["train_features"].min()
1✔
306
        data_dictionary["train_features"] = (
1✔
307
            2 * (data_dictionary["train_features"] - train_min) / (train_max - train_min) - 1
308
        )
309
        data_dictionary["test_features"] = (
1✔
310
            2 * (data_dictionary["test_features"] - train_min) / (train_max - train_min) - 1
311
        )
312

313
        for item in train_max.keys():
1✔
314
            self.data[item + "_max"] = train_max[item]
1✔
315
            self.data[item + "_min"] = train_min[item]
1✔
316

317
        for item in data_dictionary["train_labels"].keys():
1✔
318
            if data_dictionary["train_labels"][item].dtype == object:
1✔
319
                continue
1✔
320
            train_labels_max = data_dictionary["train_labels"][item].max()
1✔
321
            train_labels_min = data_dictionary["train_labels"][item].min()
1✔
322
            data_dictionary["train_labels"][item] = (
1✔
323
                2
324
                * (data_dictionary["train_labels"][item] - train_labels_min)
325
                / (train_labels_max - train_labels_min)
326
                - 1
327
            )
328
            if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
329
                data_dictionary["test_labels"][item] = (
1✔
330
                    2
331
                    * (data_dictionary["test_labels"][item] - train_labels_min)
332
                    / (train_labels_max - train_labels_min)
333
                    - 1
334
                )
335

336
            self.data[f"{item}_max"] = train_labels_max
1✔
337
            self.data[f"{item}_min"] = train_labels_min
1✔
338
        return data_dictionary
1✔
339

340
    def normalize_single_dataframe(self, df: DataFrame) -> DataFrame:
1✔
341

342
        train_max = df.max()
1✔
343
        train_min = df.min()
1✔
344
        df = (
1✔
345
            2 * (df - train_min) / (train_max - train_min) - 1
346
        )
347

348
        for item in train_max.keys():
1✔
349
            self.data[item + "_max"] = train_max[item]
1✔
350
            self.data[item + "_min"] = train_min[item]
1✔
351

352
        return df
1✔
353

354
    def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame:
1✔
355
        """
356
        Normalize a set of data using the mean and standard deviation from
357
        the associated training data.
358
        :param df: Dataframe to be standardized
359
        """
360

361
        train_max = [None] * len(df.keys())
1✔
362
        train_min = [None] * len(df.keys())
1✔
363

364
        for i, item in enumerate(df.keys()):
1✔
365
            train_max[i] = self.data[f"{item}_max"]
1✔
366
            train_min[i] = self.data[f"{item}_min"]
1✔
367

368
        train_max_series = pd.Series(train_max, index=df.keys())
1✔
369
        train_min_series = pd.Series(train_min, index=df.keys())
1✔
370

371
        df = (
1✔
372
            2 * (df - train_min_series) / (train_max_series - train_min_series) - 1
373
        )
374

375
        return df
1✔
376

377
    def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame:
1✔
378
        """
379
        Denormalize a set of data using the mean and standard deviation from
380
        the associated training data.
381
        :param df: Dataframe of predictions to be denormalized
382
        """
383

384
        for label in df.columns:
1✔
385
            if df[label].dtype == object or label in self.unique_class_list:
1✔
386
                continue
×
387
            df[label] = (
1✔
388
                (df[label] + 1)
389
                * (self.data[f"{label}_max"] - self.data[f"{label}_min"])
390
                / 2
391
            ) + self.data[f"{label}_min"]
392

393
        return df
1✔
394

395
    def split_timerange(
1✔
396
        self, tr: str, train_split: int = 28, bt_split: float = 7
397
    ) -> Tuple[list, list]:
398
        """
399
        Function which takes a single time range (tr) and splits it
400
        into sub timeranges to train and backtest on based on user input
401
        tr: str, full timerange to train on
402
        train_split: the period length for the each training (days). Specified in user
403
        configuration file
404
        bt_split: the backtesting length (days). Specified in user configuration file
405
        """
406

407
        if not isinstance(train_split, int) or train_split < 1:
1✔
408
            raise OperationalException(
1✔
409
                f"train_period_days must be an integer greater than 0. Got {train_split}."
410
            )
411
        train_period_days = train_split * SECONDS_IN_DAY
1✔
412
        bt_period = bt_split * SECONDS_IN_DAY
1✔
413

414
        full_timerange = TimeRange.parse_timerange(tr)
1✔
415
        config_timerange = TimeRange.parse_timerange(self.config["timerange"])
1✔
416
        if config_timerange.stopts == 0:
1✔
417
            config_timerange.stopts = int(
×
418
                datetime.now(tz=timezone.utc).timestamp()
419
            )
420
        timerange_train = copy.deepcopy(full_timerange)
1✔
421
        timerange_backtest = copy.deepcopy(full_timerange)
1✔
422

423
        tr_training_list = []
1✔
424
        tr_backtesting_list = []
1✔
425
        tr_training_list_timerange = []
1✔
426
        tr_backtesting_list_timerange = []
1✔
427
        first = True
1✔
428

429
        while True:
1✔
430
            if not first:
1✔
431
                timerange_train.startts = timerange_train.startts + int(bt_period)
1✔
432
            timerange_train.stopts = timerange_train.startts + train_period_days
1✔
433

434
            first = False
1✔
435
            tr_training_list.append(timerange_train.timerange_str)
1✔
436
            tr_training_list_timerange.append(copy.deepcopy(timerange_train))
1✔
437

438
            # associated backtest period
439

440
            timerange_backtest.startts = timerange_train.stopts
1✔
441

442
            timerange_backtest.stopts = timerange_backtest.startts + int(bt_period)
1✔
443

444
            if timerange_backtest.stopts > config_timerange.stopts:
1✔
445
                timerange_backtest.stopts = config_timerange.stopts
1✔
446

447
            tr_backtesting_list.append(timerange_backtest.timerange_str)
1✔
448
            tr_backtesting_list_timerange.append(copy.deepcopy(timerange_backtest))
1✔
449

450
            # ensure we are predicting on exactly same amount of data as requested by user defined
451
            #  --timerange
452
            if timerange_backtest.stopts == config_timerange.stopts:
1✔
453
                break
1✔
454

455
        # print(tr_training_list, tr_backtesting_list)
456
        return tr_training_list_timerange, tr_backtesting_list_timerange
1✔
457

458
    def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
1✔
459
        """
460
        Given a full dataframe, extract the user desired window
461
        :param tr: timerange string that we wish to extract from df
462
        :param df: Dataframe containing all candles to run the entire backtest. Here
463
                   it is sliced down to just the present training period.
464
        """
465
        if not self.live:
1✔
466
            df = df.loc[(df["date"] >= timerange.startdt) & (df["date"] < timerange.stopdt), :]
1✔
467
        else:
468
            df = df.loc[df["date"] >= timerange.startdt, :]
×
469

470
        return df
1✔
471

472
    def check_pred_labels(self, df_predictions: DataFrame) -> DataFrame:
1✔
473
        """
474
        Check that prediction feature labels match training feature labels.
475
        :param df_predictions: incoming predictions
476
        """
477
        constant_labels = self.data['constant_features_list']
1✔
478
        df_predictions = df_predictions.filter(
1✔
479
            df_predictions.columns.difference(constant_labels)
480
        )
481
        logger.warning(
1✔
482
            f"Removed {len(constant_labels)} features from prediction features, "
483
            f"these were considered constant values during most recent training."
484
        )
485

486
        return df_predictions
1✔
487

488
    def principal_component_analysis(self) -> None:
1✔
489
        """
490
        Performs Principal Component Analysis on the data for dimensionality reduction
491
        and outlier detection (see self.remove_outliers())
492
        No parameters or returns, it acts on the data_dictionary held by the DataHandler.
493
        """
494

495
        from sklearn.decomposition import PCA  # avoid importing if we dont need it
1✔
496

497
        pca = PCA(0.999)
1✔
498
        pca = pca.fit(self.data_dictionary["train_features"])
1✔
499
        n_keep_components = pca.n_components_
1✔
500
        self.data["n_kept_components"] = n_keep_components
1✔
501
        n_components = self.data_dictionary["train_features"].shape[1]
1✔
502
        logger.info("reduced feature dimension by %s", n_components - n_keep_components)
1✔
503
        logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_))
1✔
504

505
        train_components = pca.transform(self.data_dictionary["train_features"])
1✔
506
        self.data_dictionary["train_features"] = pd.DataFrame(
1✔
507
            data=train_components,
508
            columns=["PC" + str(i) for i in range(0, n_keep_components)],
509
            index=self.data_dictionary["train_features"].index,
510
        )
511
        # normalsing transformed training features
512
        self.data_dictionary["train_features"] = self.normalize_single_dataframe(
1✔
513
            self.data_dictionary["train_features"])
514

515
        # keeping a copy of the non-transformed features so we can check for errors during
516
        # model load from disk
517
        self.data["training_features_list_raw"] = copy.deepcopy(self.training_features_list)
1✔
518
        self.training_features_list = self.data_dictionary["train_features"].columns
1✔
519

520
        if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
521
            test_components = pca.transform(self.data_dictionary["test_features"])
1✔
522
            self.data_dictionary["test_features"] = pd.DataFrame(
1✔
523
                data=test_components,
524
                columns=["PC" + str(i) for i in range(0, n_keep_components)],
525
                index=self.data_dictionary["test_features"].index,
526
            )
527
            # normalise transformed test feature to transformed training features
528
            self.data_dictionary["test_features"] = self.normalize_data_from_metadata(
1✔
529
                self.data_dictionary["test_features"])
530

531
        self.data["n_kept_components"] = n_keep_components
1✔
532
        self.pca = pca
1✔
533

534
        logger.info(f"PCA reduced total features from  {n_components} to {n_keep_components}")
1✔
535

536
        if not self.data_path.is_dir():
1✔
537
            self.data_path.mkdir(parents=True, exist_ok=True)
1✔
538

539
        return None
1✔
540

541
    def pca_transform(self, filtered_dataframe: DataFrame) -> None:
1✔
542
        """
543
        Use an existing pca transform to transform data into components
544
        :param filtered_dataframe: DataFrame = the cleaned dataframe
545
        """
546
        pca_components = self.pca.transform(filtered_dataframe)
×
547
        self.data_dictionary["prediction_features"] = pd.DataFrame(
×
548
            data=pca_components,
549
            columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])],
550
            index=filtered_dataframe.index,
551
        )
552
        # normalise transformed predictions to transformed training features
553
        self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata(
×
554
            self.data_dictionary["prediction_features"])
555

556
    def compute_distances(self) -> float:
1✔
557
        """
558
        Compute distances between each training point and every other training
559
        point. This metric defines the neighborhood of trained data and is used
560
        for prediction confidence in the Dissimilarity Index
561
        """
562
        # logger.info("computing average mean distance for all training points")
563
        pairwise = pairwise_distances(
1✔
564
            self.data_dictionary["train_features"], n_jobs=self.thread_count)
565
        # remove the diagonal distances which are itself distances ~0
566
        np.fill_diagonal(pairwise, np.NaN)
1✔
567
        pairwise = pairwise.reshape(-1, 1)
1✔
568
        avg_mean_dist = pairwise[~np.isnan(pairwise)].mean()
1✔
569

570
        return avg_mean_dist
1✔
571

572
    def get_outlier_percentage(self, dropped_pts: npt.NDArray) -> float:
1✔
573
        """
574
        Check if more than X% of points werer dropped during outlier detection.
575
        """
576
        outlier_protection_pct = self.freqai_config["feature_parameters"].get(
1✔
577
            "outlier_protection_percentage", 30)
578
        outlier_pct = (dropped_pts.sum() / len(dropped_pts)) * 100
1✔
579
        if outlier_pct >= outlier_protection_pct:
1✔
580
            return outlier_pct
1✔
581
        else:
582
            return 0.0
1✔
583

584
    def use_SVM_to_remove_outliers(self, predict: bool) -> None:
1✔
585
        """
586
        Build/inference a Support Vector Machine to detect outliers
587
        in training data and prediction
588
        :param predict: bool = If true, inference an existing SVM model, else construct one
589
        """
590

591
        if self.keras:
1✔
592
            logger.warning(
×
593
                "SVM outlier removal not currently supported for Keras based models. "
594
                "Skipping user requested function."
595
            )
596
            if predict:
×
597
                self.do_predict = np.ones(len(self.data_dictionary["prediction_features"]))
×
598
            return
×
599

600
        if predict:
1✔
601
            if not self.svm_model:
1✔
602
                logger.warning("No svm model available for outlier removal")
×
603
                return
×
604
            y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
1✔
605
            do_predict = np.where(y_pred == -1, 0, y_pred)
1✔
606

607
            if (len(do_predict) - do_predict.sum()) > 0:
1✔
608
                logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.")
1✔
609
            self.do_predict += do_predict
1✔
610
            self.do_predict -= 1
1✔
611

612
        else:
613
            # use SGDOneClassSVM to increase speed?
614
            svm_params = self.freqai_config["feature_parameters"].get(
1✔
615
                "svm_params", {"shuffle": False, "nu": 0.1})
616
            self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit(
1✔
617
                self.data_dictionary["train_features"]
618
            )
619
            y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
1✔
620
            kept_points = np.where(y_pred == -1, 0, y_pred)
1✔
621
            # keep_index = np.where(y_pred == 1)
622
            outlier_pct = self.get_outlier_percentage(1 - kept_points)
1✔
623
            if outlier_pct:
1✔
624
                logger.warning(
1✔
625
                        f"SVM detected {outlier_pct:.2f}% of the points as outliers. "
626
                        f"Keeping original dataset."
627
                )
628
                self.svm_model = None
1✔
629
                return
1✔
630

631
            self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
1✔
632
                (y_pred == 1)
633
            ]
634
            self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
1✔
635
                (y_pred == 1)
636
            ]
637
            self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
1✔
638
                (y_pred == 1)
639
            ]
640

641
            logger.info(
1✔
642
                f"SVM tossed {len(y_pred) - kept_points.sum()}"
643
                f" train points from {len(y_pred)} total points."
644
            )
645

646
            # same for test data
647
            # TODO: This (and the part above) could be refactored into a separate function
648
            # to reduce code duplication
649
            if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0:
1✔
650
                y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
1✔
651
                kept_points = np.where(y_pred == -1, 0, y_pred)
1✔
652
                self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
1✔
653
                    (y_pred == 1)
654
                ]
655
                self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][(
1✔
656
                    y_pred == 1)]
657
                self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
1✔
658
                    (y_pred == 1)
659
                ]
660

661
            logger.info(
1✔
662
                f"SVM tossed {len(y_pred) - kept_points.sum()}"
663
                f" test points from {len(y_pred)} total points."
664
            )
665

666
        return
1✔
667

668
    def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None:
1✔
669
        """
670
        Use DBSCAN to cluster training data and remove "noisy" data (read outliers).
671
        User controls this via the config param `DBSCAN_outlier_pct` which indicates the
672
        pct of training data that they want to be considered outliers.
673
        :param predict: bool = If False (training), iterate to find the best hyper parameters
674
                        to match user requested outlier percent target.
675
                        If True (prediction), use the parameters determined from
676
                        the previous training to estimate if the current prediction point
677
                        is an outlier.
678
        """
679

680
        if predict:
1✔
681
            if not self.data['DBSCAN_eps']:
×
682
                return
×
683
            train_ft_df = self.data_dictionary['train_features']
×
684
            pred_ft_df = self.data_dictionary['prediction_features']
×
685
            num_preds = len(pred_ft_df)
×
686
            df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True)
×
687
            clustering = DBSCAN(eps=self.data['DBSCAN_eps'],
×
688
                                min_samples=self.data['DBSCAN_min_samples'],
689
                                n_jobs=self.thread_count
690
                                ).fit(df)
691
            do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1)
×
692

693
            if (len(do_predict) - do_predict.sum()) > 0:
×
694
                logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions")
×
695
            self.do_predict += do_predict
×
696
            self.do_predict -= 1
×
697

698
        else:
699

700
            def normalise_distances(distances):
1✔
701
                normalised_distances = (distances - distances.min()) / \
1✔
702
                                        (distances.max() - distances.min())
703
                return normalised_distances
1✔
704

705
            def rotate_point(origin, point, angle):
1✔
706
                # rotate a point counterclockwise by a given angle (in radians)
707
                # around a given origin
708
                x = origin[0] + cos(angle) * (point[0] - origin[0]) - \
1✔
709
                                    sin(angle) * (point[1] - origin[1])
710
                y = origin[1] + sin(angle) * (point[0] - origin[0]) + \
1✔
711
                    cos(angle) * (point[1] - origin[1])
712
                return (x, y)
1✔
713

714
            MinPts = int(len(self.data_dictionary['train_features'].index) * 0.25)
1✔
715
            # measure pairwise distances to nearest neighbours
716
            neighbors = NearestNeighbors(
1✔
717
                n_neighbors=MinPts, n_jobs=self.thread_count)
718
            neighbors_fit = neighbors.fit(self.data_dictionary['train_features'])
1✔
719
            distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features'])
1✔
720
            distances = np.sort(distances, axis=0).mean(axis=1)
1✔
721

722
            normalised_distances = normalise_distances(distances)
1✔
723
            x_range = np.linspace(0, 1, len(distances))
1✔
724
            line = np.linspace(normalised_distances[0],
1✔
725
                               normalised_distances[-1], len(normalised_distances))
726
            deflection = np.abs(normalised_distances - line)
1✔
727
            max_deflection_loc = np.where(deflection == deflection.max())[0][0]
1✔
728
            origin = x_range[max_deflection_loc], line[max_deflection_loc]
1✔
729
            point = x_range[max_deflection_loc], normalised_distances[max_deflection_loc]
1✔
730
            rot_angle = np.pi / 4
1✔
731
            elbow_loc = rotate_point(origin, point, rot_angle)
1✔
732

733
            epsilon = elbow_loc[1] * (distances[-1] - distances[0]) + distances[0]
1✔
734

735
            clustering = DBSCAN(eps=epsilon, min_samples=MinPts,
1✔
736
                                n_jobs=int(self.thread_count)).fit(
737
                                                    self.data_dictionary['train_features']
738
                                                )
739

740
            logger.info(f'DBSCAN found eps of {epsilon:.2f}.')
1✔
741

742
            self.data['DBSCAN_eps'] = epsilon
1✔
743
            self.data['DBSCAN_min_samples'] = MinPts
1✔
744
            dropped_points = np.where(clustering.labels_ == -1, 1, 0)
1✔
745

746
            outlier_pct = self.get_outlier_percentage(dropped_points)
1✔
747
            if outlier_pct:
1✔
748
                logger.warning(
×
749
                        f"DBSCAN detected {outlier_pct:.2f}% of the points as outliers. "
750
                        f"Keeping original dataset."
751
                )
752
                self.data['DBSCAN_eps'] = 0
×
753
                return
×
754

755
            self.data_dictionary['train_features'] = self.data_dictionary['train_features'][
1✔
756
                (clustering.labels_ != -1)
757
            ]
758
            self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
1✔
759
                (clustering.labels_ != -1)
760
            ]
761
            self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
1✔
762
                (clustering.labels_ != -1)
763
            ]
764

765
            logger.info(
1✔
766
                f"DBSCAN tossed {dropped_points.sum()}"
767
                f" train points from {len(clustering.labels_)}"
768
            )
769

770
        return
1✔
771

772
    def compute_inlier_metric(self, set_='train') -> None:
1✔
773
        """
774
        Compute inlier metric from backwards distance distributions.
775
        This metric defines how well features from a timepoint fit
776
        into previous timepoints.
777
        """
778

779
        def normalise(dataframe: DataFrame, key: str) -> DataFrame:
1✔
780
            if set_ == 'train':
1✔
781
                min_value = dataframe.min()
1✔
782
                max_value = dataframe.max()
1✔
783
                self.data[f'{key}_min'] = min_value
1✔
784
                self.data[f'{key}_max'] = max_value
1✔
785
            else:
786
                min_value = self.data[f'{key}_min']
×
787
                max_value = self.data[f'{key}_max']
×
788
            return (dataframe - min_value) / (max_value - min_value)
1✔
789

790
        no_prev_pts = self.freqai_config["feature_parameters"]["inlier_metric_window"]
1✔
791

792
        if set_ == 'train':
1✔
793
            compute_df = copy.deepcopy(self.data_dictionary['train_features'])
1✔
794
        elif set_ == 'test':
×
795
            compute_df = copy.deepcopy(self.data_dictionary['test_features'])
×
796
        else:
797
            compute_df = copy.deepcopy(self.data_dictionary['prediction_features'])
×
798

799
        compute_df_reindexed = compute_df.reindex(
1✔
800
            index=np.flip(compute_df.index)
801
        )
802

803
        pairwise = pd.DataFrame(
1✔
804
            np.triu(
805
                pairwise_distances(compute_df_reindexed, n_jobs=self.thread_count)
806
            ),
807
            columns=compute_df_reindexed.index,
808
            index=compute_df_reindexed.index
809
        )
810
        pairwise = pairwise.round(5)
1✔
811

812
        column_labels = [
1✔
813
            '{}{}'.format('d', i) for i in range(1, no_prev_pts + 1)
814
        ]
815
        distances = pd.DataFrame(
1✔
816
            columns=column_labels, index=compute_df.index
817
        )
818

819
        for index in compute_df.index[no_prev_pts:]:
1✔
820
            current_row = pairwise.loc[[index]]
1✔
821
            current_row_no_zeros = current_row.loc[
1✔
822
                :, (current_row != 0).any(axis=0)
823
            ]
824
            distances.loc[[index]] = current_row_no_zeros.iloc[
1✔
825
                :, :no_prev_pts
826
            ]
827
        distances = distances.replace([np.inf, -np.inf], np.nan)
1✔
828
        drop_index = pd.isnull(distances).any(axis=1)
1✔
829
        distances = distances[drop_index == 0]
1✔
830

831
        inliers = pd.DataFrame(index=distances.index)
1✔
832
        for key in distances.keys():
1✔
833
            current_distances = distances[key].dropna()
1✔
834
            current_distances = normalise(current_distances, key)
1✔
835
            if set_ == 'train':
1✔
836
                fit_params = stats.weibull_min.fit(current_distances)
1✔
837
                self.data[f'{key}_fit_params'] = fit_params
1✔
838
            else:
839
                fit_params = self.data[f'{key}_fit_params']
×
840
            quantiles = stats.weibull_min.cdf(current_distances, *fit_params)
1✔
841

842
            df_inlier = pd.DataFrame(
1✔
843
                {key: quantiles}, index=distances.index
844
            )
845
            inliers = pd.concat(
1✔
846
                [inliers, df_inlier], axis=1
847
            )
848

849
        inlier_metric = pd.DataFrame(
1✔
850
            data=inliers.sum(axis=1) / no_prev_pts,
851
            columns=['%-inlier_metric'],
852
            index=compute_df.index
853
        )
854

855
        inlier_metric = (2 * (inlier_metric - inlier_metric.min()) /
1✔
856
                         (inlier_metric.max() - inlier_metric.min()) - 1)
857

858
        if set_ in ('train', 'test'):
1✔
859
            inlier_metric = inlier_metric.iloc[no_prev_pts:]
1✔
860
            compute_df = compute_df.iloc[no_prev_pts:]
1✔
861
            self.remove_beginning_points_from_data_dict(set_, no_prev_pts)
1✔
862
            self.data_dictionary[f'{set_}_features'] = pd.concat(
1✔
863
                [compute_df, inlier_metric], axis=1)
864
        else:
865
            self.data_dictionary['prediction_features'] = pd.concat(
×
866
                [compute_df, inlier_metric], axis=1)
867
            self.data_dictionary['prediction_features'].fillna(0, inplace=True)
×
868

869
        logger.info('Inlier metric computed and added to features.')
1✔
870

871
        return None
1✔
872

873
    def remove_beginning_points_from_data_dict(self, set_='train', no_prev_pts: int = 10):
1✔
874
        features = self.data_dictionary[f'{set_}_features']
1✔
875
        weights = self.data_dictionary[f'{set_}_weights']
1✔
876
        labels = self.data_dictionary[f'{set_}_labels']
1✔
877
        self.data_dictionary[f'{set_}_weights'] = weights[no_prev_pts:]
1✔
878
        self.data_dictionary[f'{set_}_features'] = features.iloc[no_prev_pts:]
1✔
879
        self.data_dictionary[f'{set_}_labels'] = labels.iloc[no_prev_pts:]
1✔
880

881
    def add_noise_to_training_features(self) -> None:
1✔
882
        """
883
        Add noise to train features to reduce the risk of overfitting.
884
        """
885
        mu = 0  # no shift
1✔
886
        sigma = self.freqai_config["feature_parameters"]["noise_standard_deviation"]
1✔
887
        compute_df = self.data_dictionary['train_features']
1✔
888
        noise = np.random.normal(mu, sigma, [compute_df.shape[0], compute_df.shape[1]])
1✔
889
        self.data_dictionary['train_features'] += noise
1✔
890
        return
1✔
891

892
    def find_features(self, dataframe: DataFrame) -> None:
1✔
893
        """
894
        Find features in the strategy provided dataframe
895
        :param dataframe: DataFrame = strategy provided dataframe
896
        :return:
897
        features: list = the features to be used for training/prediction
898
        """
899
        column_names = dataframe.columns
1✔
900
        features = [c for c in column_names if "%" in c]
1✔
901

902
        if not features:
1✔
903
            raise OperationalException("Could not find any features!")
×
904

905
        self.training_features_list = features
1✔
906

907
    def find_labels(self, dataframe: DataFrame) -> None:
1✔
908
        column_names = dataframe.columns
1✔
909
        labels = [c for c in column_names if "&" in c]
1✔
910
        self.label_list = labels
1✔
911

912
    def check_if_pred_in_training_spaces(self) -> None:
1✔
913
        """
914
        Compares the distance from each prediction point to each training data
915
        point. It uses this information to estimate a Dissimilarity Index (DI)
916
        and avoid making predictions on any points that are too far away
917
        from the training data set.
918
        """
919

920
        distance = pairwise_distances(
1✔
921
            self.data_dictionary["train_features"],
922
            self.data_dictionary["prediction_features"],
923
            n_jobs=self.thread_count,
924
        )
925

926
        self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
1✔
927

928
        do_predict = np.where(
1✔
929
            self.DI_values < self.freqai_config["feature_parameters"]["DI_threshold"],
930
            1,
931
            0,
932
        )
933

934
        if (len(do_predict) - do_predict.sum()) > 0:
1✔
935
            logger.info(
1✔
936
                f"DI tossed {len(do_predict) - do_predict.sum()} predictions for "
937
                "being too far from training data."
938
            )
939

940
        self.do_predict += do_predict
1✔
941
        self.do_predict -= 1
1✔
942

943
    def set_weights_higher_recent(self, num_weights: int) -> npt.ArrayLike:
1✔
944
        """
945
        Set weights so that recent data is more heavily weighted during
946
        training than older data.
947
        """
948
        wfactor = self.config["freqai"]["feature_parameters"]["weight_factor"]
1✔
949
        weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
1✔
950
        return weights
1✔
951

952
    def get_predictions_to_append(self, predictions: DataFrame,
1✔
953
                                  do_predict: npt.ArrayLike,
954
                                  dataframe_backtest: DataFrame) -> DataFrame:
955
        """
956
        Get backtest prediction from current backtest period
957
        """
958

959
        append_df = DataFrame()
1✔
960
        for label in predictions.columns:
1✔
961
            append_df[label] = predictions[label]
1✔
962
            if append_df[label].dtype == object:
1✔
963
                continue
1✔
964
            if "labels_mean" in self.data:
1✔
965
                append_df[f"{label}_mean"] = self.data["labels_mean"][label]
1✔
966
            if "labels_std" in self.data:
1✔
967
                append_df[f"{label}_std"] = self.data["labels_std"][label]
1✔
968

969
        for extra_col in self.data["extra_returns_per_train"]:
1✔
970
            append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
×
971

972
        append_df["do_predict"] = do_predict
1✔
973
        if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
1✔
974
            append_df["DI_values"] = self.DI_values
1✔
975

976
        dataframe_backtest.reset_index(drop=True, inplace=True)
1✔
977
        merged_df = pd.concat([dataframe_backtest["date"], append_df], axis=1)
1✔
978
        return merged_df
1✔
979

980
    def append_predictions(self, append_df: DataFrame) -> None:
1✔
981
        """
982
        Append backtest prediction from current backtest period to all previous periods
983
        """
984

985
        if self.full_df.empty:
1✔
986
            self.full_df = append_df
1✔
987
        else:
988
            self.full_df = pd.concat([self.full_df, append_df], axis=0, ignore_index=True)
1✔
989

990
    def fill_predictions(self, dataframe):
1✔
991
        """
992
        Back fill values to before the backtesting range so that the dataframe matches size
993
        when it goes back to the strategy. These rows are not included in the backtest.
994
        """
995
        to_keep = [col for col in dataframe.columns if not col.startswith("&")]
1✔
996
        self.return_dataframe = pd.merge(dataframe[to_keep],
1✔
997
                                         self.full_df, how='left', on='date')
998
        self.return_dataframe[self.full_df.columns] = (
1✔
999
            self.return_dataframe[self.full_df.columns].fillna(value=0))
1000
        self.full_df = DataFrame()
1✔
1001

1002
        return
1✔
1003

1004
    def create_fulltimerange(self, backtest_tr: str, backtest_period_days: int) -> str:
1✔
1005

1006
        if not isinstance(backtest_period_days, int):
1✔
1007
            raise OperationalException("backtest_period_days must be an integer")
1✔
1008

1009
        if backtest_period_days < 0:
1✔
1010
            raise OperationalException("backtest_period_days must be positive")
1✔
1011

1012
        backtest_timerange = TimeRange.parse_timerange(backtest_tr)
1✔
1013

1014
        if backtest_timerange.stopts == 0:
1✔
1015
            # typically open ended time ranges do work, however, there are some edge cases where
1016
            # it does not. accommodating these kinds of edge cases just to allow open-ended
1017
            # timerange is not high enough priority to warrant the effort. It is safer for now
1018
            # to simply ask user to add their end date
1019
            raise OperationalException("FreqAI backtesting does not allow open ended timeranges. "
×
1020
                                       "Please indicate the end date of your desired backtesting. "
1021
                                       "timerange.")
1022
            # backtest_timerange.stopts = int(
1023
            #     datetime.now(tz=timezone.utc).timestamp()
1024
            # )
1025

1026
        backtest_timerange.startts = (
1✔
1027
            backtest_timerange.startts - backtest_period_days * SECONDS_IN_DAY
1028
        )
1029
        full_timerange = backtest_timerange.timerange_str
1✔
1030
        config_path = Path(self.config["config_files"][0])
1✔
1031

1032
        if not self.full_path.is_dir():
1✔
1033
            self.full_path.mkdir(parents=True, exist_ok=True)
1✔
1034
            shutil.copy(
1✔
1035
                config_path.resolve(),
1036
                Path(self.full_path / config_path.parts[-1]),
1037
            )
1038

1039
        return full_timerange
1✔
1040

1041
    def check_if_model_expired(self, trained_timestamp: int) -> bool:
1✔
1042
        """
1043
        A model age checker to determine if the model is trustworthy based on user defined
1044
        `expiration_hours` in the configuration file.
1045
        :param trained_timestamp: int = The time of training for the most recent model.
1046
        :return:
1047
            bool = If the model is expired or not.
1048
        """
1049
        time = datetime.now(tz=timezone.utc).timestamp()
1✔
1050
        elapsed_time = (time - trained_timestamp) / 3600  # hours
1✔
1051
        max_time = self.freqai_config.get("expiration_hours", 0)
1✔
1052
        if max_time > 0:
1✔
1053
            return elapsed_time > max_time
1✔
1054
        else:
1055
            return False
×
1056

1057
    def check_if_new_training_required(
1✔
1058
        self, trained_timestamp: int
1059
    ) -> Tuple[bool, TimeRange, TimeRange]:
1060

1061
        time = datetime.now(tz=timezone.utc).timestamp()
×
1062
        trained_timerange = TimeRange()
×
1063
        data_load_timerange = TimeRange()
×
1064

1065
        timeframes = self.freqai_config["feature_parameters"].get("include_timeframes")
×
1066

1067
        max_tf_seconds = 0
×
1068
        for tf in timeframes:
×
1069
            secs = timeframe_to_seconds(tf)
×
1070
            if secs > max_tf_seconds:
×
1071
                max_tf_seconds = secs
×
1072

1073
        # We notice that users like to use exotic indicators where
1074
        # they do not know the required timeperiod. Here we include a factor
1075
        # of safety by multiplying the user considered "max" by 2.
1076
        max_period = self.config.get('startup_candle_count', 20) * 2
×
1077
        additional_seconds = max_period * max_tf_seconds
×
1078

1079
        if trained_timestamp != 0:
×
1080
            elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR
×
1081
            retrain = elapsed_time > self.freqai_config.get("live_retrain_hours", 0)
×
1082
            if retrain:
×
1083
                trained_timerange.startts = int(
×
1084
                    time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1085
                )
1086
                trained_timerange.stopts = int(time)
×
1087
                # we want to load/populate indicators on more data than we plan to train on so
1088
                # because most of the indicators have a rolling timeperiod, and are thus NaNs
1089
                # unless they have data further back in time before the start of the train period
1090
                data_load_timerange.startts = int(
×
1091
                    time
1092
                    - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1093
                    - additional_seconds
1094
                )
1095
                data_load_timerange.stopts = int(time)
×
1096
        else:  # user passed no live_trained_timerange in config
1097
            trained_timerange.startts = int(
×
1098
                time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1099
            )
1100
            trained_timerange.stopts = int(time)
×
1101

1102
            data_load_timerange.startts = int(
×
1103
                time
1104
                - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1105
                - additional_seconds
1106
            )
1107
            data_load_timerange.stopts = int(time)
×
1108
            retrain = True
×
1109

1110
        return retrain, trained_timerange, data_load_timerange
×
1111

1112
    def set_new_model_names(self, pair: str, timestamp_id: int):
1✔
1113

1114
        coin, _ = pair.split("/")
1✔
1115
        self.data_path = Path(
1✔
1116
            self.full_path
1117
            / f"sub-train-{pair.split('/')[0]}_{timestamp_id}"
1118
        )
1119

1120
        self.model_filename = f"cb_{coin.lower()}_{timestamp_id}"
1✔
1121

1122
    def set_all_pairs(self) -> None:
1✔
1123

1124
        self.all_pairs = copy.deepcopy(
1✔
1125
            self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
1126
        )
1127
        for pair in self.config.get("exchange", "").get("pair_whitelist"):
1✔
1128
            if pair not in self.all_pairs:
1✔
1129
                self.all_pairs.append(pair)
1✔
1130

1131
    def extract_corr_pair_columns_from_populated_indicators(
1✔
1132
        self,
1133
        dataframe: DataFrame
1134
    ) -> Dict[str, DataFrame]:
1135
        """
1136
        Find the columns of the dataframe corresponding to the corr_pairlist, save them
1137
        in a dictionary to be reused and attached to other pairs.
1138

1139
        :param dataframe: fully populated dataframe (current pair + corr_pairs)
1140
        :return: corr_dataframes, dictionary of dataframes to be attached
1141
                 to other pairs in same candle.
1142
        """
1143
        corr_dataframes: Dict[str, DataFrame] = {}
1✔
1144
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
1✔
1145

1146
        for pair in pairs:
1✔
1147
            pair = pair.replace(':', '')  # lightgbm doesnt like colons
1✔
1148
            valid_strs = [f"%-{pair}", f"%{pair}", f"%_{pair}"]
1✔
1149
            pair_cols = [col for col in dataframe.columns if
1✔
1150
                         any(substr in col for substr in valid_strs)]
1151
            if pair_cols:
1✔
1152
                pair_cols.insert(0, 'date')
1✔
1153
                corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
1✔
1154

1155
        return corr_dataframes
1✔
1156

1157
    def attach_corr_pair_columns(self, dataframe: DataFrame,
1✔
1158
                                 corr_dataframes: Dict[str, DataFrame],
1159
                                 current_pair: str) -> DataFrame:
1160
        """
1161
        Attach the existing corr_pair dataframes to the current pair dataframe before training
1162

1163
        :param dataframe: current pair strategy dataframe, indicators populated already
1164
        :param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
1165
        :param current_pair: current pair to which we will attach corr pair dataframe
1166
        :return:
1167
        :dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
1168
                    ready for training
1169
        """
1170
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
1171
        current_pair = current_pair.replace(':', '')
×
1172
        for pair in pairs:
×
1173
            pair = pair.replace(':', '')  # lightgbm doesnt work with colons
×
1174
            if current_pair != pair:
×
1175
                dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
×
1176

1177
        return dataframe
×
1178

1179
    def use_strategy_to_populate_indicators(
1✔
1180
        self,
1181
        strategy: IStrategy,
1182
        corr_dataframes: dict = {},
1183
        base_dataframes: dict = {},
1184
        pair: str = "",
1185
        prediction_dataframe: DataFrame = pd.DataFrame(),
1186
        do_corr_pairs: bool = True,
1187
    ) -> DataFrame:
1188
        """
1189
        Use the user defined strategy for populating indicators during retrain
1190
        :param strategy: IStrategy = user defined strategy object
1191
        :param corr_dataframes: dict = dict containing the informative pair dataframes
1192
                                (for user defined timeframes)
1193
        :param base_dataframes: dict = dict containing the current pair dataframes
1194
                                (for user defined timeframes)
1195
        :param metadata: dict = strategy furnished pair metadata
1196
        :return:
1197
        dataframe: DataFrame = dataframe containing populated indicators
1198
        """
1199

1200
        # for prediction dataframe creation, we let dataprovider handle everything in the strategy
1201
        # so we create empty dictionaries, which allows us to pass None to
1202
        # `populate_any_indicators()`. Signaling we want the dp to give us the live dataframe.
1203
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
1204
        pairs: List[str] = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
1✔
1205
        if not prediction_dataframe.empty:
1✔
1206
            dataframe = prediction_dataframe.copy()
1✔
1207
            for tf in tfs:
1✔
1208
                base_dataframes[tf] = None
1✔
1209
                for p in pairs:
1✔
1210
                    if p not in corr_dataframes:
1✔
1211
                        corr_dataframes[p] = {}
1✔
1212
                    corr_dataframes[p][tf] = None
1✔
1213
        else:
1214
            dataframe = base_dataframes[self.config["timeframe"]].copy()
1✔
1215

1216
        sgi = False
1✔
1217
        for tf in tfs:
1✔
1218
            if tf == tfs[-1]:
1✔
1219
                sgi = True  # doing this last allows user to use all tf raw prices in labels
1✔
1220
            dataframe = strategy.populate_any_indicators(
1✔
1221
                pair,
1222
                dataframe.copy(),
1223
                tf,
1224
                informative=base_dataframes[tf],
1225
                set_generalized_indicators=sgi
1226
            )
1227

1228
        # ensure corr pairs are always last
1229
        for corr_pair in pairs:
1✔
1230
            if pair == corr_pair:
1✔
1231
                continue  # dont repeat anything from whitelist
1✔
1232
            for tf in tfs:
1✔
1233
                if pairs and do_corr_pairs:
1✔
1234
                    dataframe = strategy.populate_any_indicators(
1✔
1235
                        corr_pair,
1236
                        dataframe.copy(),
1237
                        tf,
1238
                        informative=corr_dataframes[corr_pair][tf]
1239
                    )
1240

1241
        self.get_unique_classes_from_labels(dataframe)
1✔
1242

1243
        dataframe = self.remove_special_chars_from_feature_names(dataframe)
1✔
1244

1245
        if self.config.get('reduce_df_footprint', False):
1✔
1246
            dataframe = reduce_dataframe_footprint(dataframe)
1✔
1247

1248
        return dataframe
1✔
1249

1250
    def fit_labels(self) -> None:
1✔
1251
        """
1252
        Fit the labels with a gaussian distribution
1253
        """
1254
        import scipy as spy
1✔
1255

1256
        self.data["labels_mean"], self.data["labels_std"] = {}, {}
1✔
1257
        for label in self.data_dictionary["train_labels"].columns:
1✔
1258
            if self.data_dictionary["train_labels"][label].dtype == object:
1✔
1259
                continue
1✔
1260
            f = spy.stats.norm.fit(self.data_dictionary["train_labels"][label])
1✔
1261
            self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
1✔
1262

1263
        # incase targets are classifications
1264
        for label in self.unique_class_list:
1✔
1265
            self.data["labels_mean"][label], self.data["labels_std"][label] = 0, 0
1✔
1266

1267
        return
1✔
1268

1269
    def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
1✔
1270
        """
1271
        Remove the features from the dataframe before returning it to strategy. This keeps it
1272
        compact for Frequi purposes.
1273
        """
1274
        to_keep = [
×
1275
            col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
1276
        ]
1277
        return dataframe[to_keep]
×
1278

1279
    def get_unique_classes_from_labels(self, dataframe: DataFrame) -> None:
1✔
1280

1281
        # self.find_features(dataframe)
1282
        self.find_labels(dataframe)
1✔
1283

1284
        for key in self.label_list:
1✔
1285
            if dataframe[key].dtype == object:
1✔
1286
                self.unique_classes[key] = dataframe[key].dropna().unique()
1✔
1287

1288
        if self.unique_classes:
1✔
1289
            for label in self.unique_classes:
1✔
1290
                self.unique_class_list += list(self.unique_classes[label])
1✔
1291

1292
    def save_backtesting_prediction(
1✔
1293
        self, append_df: DataFrame
1294
    ) -> None:
1295
        """
1296
        Save prediction dataframe from backtesting to feather file format
1297
        :param append_df: dataframe for backtesting period
1298
        """
1299
        full_predictions_folder = Path(self.full_path / self.backtest_predictions_folder)
1✔
1300
        if not full_predictions_folder.is_dir():
1✔
1301
            full_predictions_folder.mkdir(parents=True, exist_ok=True)
1✔
1302

1303
        append_df.to_feather(self.backtesting_results_path)
1✔
1304

1305
    def get_backtesting_prediction(
1✔
1306
        self
1307
    ) -> DataFrame:
1308
        """
1309
        Get prediction dataframe from feather file format
1310
        """
1311
        append_df = pd.read_feather(self.backtesting_results_path)
1✔
1312
        return append_df
1✔
1313

1314
    def check_if_backtest_prediction_is_valid(
1✔
1315
        self,
1316
        len_backtest_df: int
1317
    ) -> bool:
1318
        """
1319
        Check if a backtesting prediction already exists and if the predictions
1320
        to append have the same size as the backtesting dataframe slice
1321
        :param length_backtesting_dataframe: Length of backtesting dataframe slice
1322
        :return:
1323
        :boolean: whether the prediction file is valid.
1324
        """
1325
        path_to_predictionfile = Path(self.full_path /
1✔
1326
                                      self.backtest_predictions_folder /
1327
                                      f"{self.model_filename}_prediction.feather")
1328
        self.backtesting_results_path = path_to_predictionfile
1✔
1329

1330
        file_exists = path_to_predictionfile.is_file()
1✔
1331

1332
        if file_exists:
1✔
1333
            append_df = self.get_backtesting_prediction()
1✔
1334
            if len(append_df) == len_backtest_df and 'date' in append_df:
1✔
1335
                logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
1✔
1336
                return True
1✔
1337
            else:
1338
                logger.info("A new backtesting prediction file is required. "
×
1339
                            "(Number of predictions is different from dataframe length or "
1340
                            "old prediction file version).")
1341
                return False
×
1342
        else:
1343
            logger.info(
1✔
1344
                f"Could not find backtesting prediction file at {path_to_predictionfile}"
1345
            )
1346
            return False
1✔
1347

1348
    def get_full_models_path(self, config: Config) -> Path:
1✔
1349
        """
1350
        Returns default FreqAI model path
1351
        :param config: Configuration dictionary
1352
        """
1353
        freqai_config: Dict[str, Any] = config["freqai"]
1✔
1354
        return Path(
1✔
1355
            config["user_data_dir"] / "models" / str(freqai_config.get("identifier"))
1356
        )
1357

1358
    def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
1✔
1359
        """
1360
        Remove all special characters from feature strings (:)
1361
        :param dataframe: the dataframe that just finished indicator population. (unfiltered)
1362
        :return: dataframe with cleaned featrue names
1363
        """
1364

1365
        spec_chars = [':']
1✔
1366
        for c in spec_chars:
1✔
1367
            dataframe.columns = dataframe.columns.str.replace(c, "")
1✔
1368

1369
        return dataframe
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc