• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 4131164979

pending completion
4131164979

push

github-actions

Matthias
filled-date shouldn't update again

1 of 1 new or added line in 1 file covered. (100.0%)

17024 of 17946 relevant lines covered (94.86%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.76
/freqtrade/freqai/data_kitchen.py
1
import copy
1✔
2
import inspect
1✔
3
import logging
1✔
4
import shutil
1✔
5
from datetime import datetime, timezone
1✔
6
from math import cos, sin
1✔
7
from pathlib import Path
1✔
8
from typing import Any, Dict, List, Optional, Tuple
1✔
9

10
import numpy as np
1✔
11
import numpy.typing as npt
1✔
12
import pandas as pd
1✔
13
import psutil
1✔
14
from pandas import DataFrame
1✔
15
from scipy import stats
1✔
16
from sklearn import linear_model
1✔
17
from sklearn.cluster import DBSCAN
1✔
18
from sklearn.metrics.pairwise import pairwise_distances
1✔
19
from sklearn.model_selection import train_test_split
1✔
20
from sklearn.neighbors import NearestNeighbors
1✔
21

22
from freqtrade.configuration import TimeRange
1✔
23
from freqtrade.constants import Config
1✔
24
from freqtrade.data.converter import reduce_dataframe_footprint
1✔
25
from freqtrade.exceptions import OperationalException
1✔
26
from freqtrade.exchange import timeframe_to_seconds
1✔
27
from freqtrade.strategy import merge_informative_pair
1✔
28
from freqtrade.strategy.interface import IStrategy
1✔
29

30

31
SECONDS_IN_DAY = 86400
1✔
32
SECONDS_IN_HOUR = 3600
1✔
33

34
logger = logging.getLogger(__name__)
1✔
35

36

37
class FreqaiDataKitchen:
1✔
38
    """
39
    Class designed to analyze data for a single pair. Employed by the IFreqaiModel class.
40
    Functionalities include holding, saving, loading, and analyzing the data.
41

42
    This object is not persistent, it is reinstantiated for each coin, each time the coin
43
    model needs to be inferenced or trained.
44

45
    Record of contribution:
46
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
47
    project.
48

49
    Conception and software development:
50
    Robert Caulk @robcaulk
51

52
    Theoretical brainstorming:
53
    Elin Törnquist @th0rntwig
54

55
    Code review, software architecture brainstorming:
56
    @xmatthias
57

58
    Beta testing and bug reporting:
59
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
60
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
61
    """
62

63
    def __init__(
1✔
64
        self,
65
        config: Config,
66
        live: bool = False,
67
        pair: str = "",
68
    ):
69
        self.data: Dict[str, Any] = {}
1✔
70
        self.data_dictionary: Dict[str, DataFrame] = {}
1✔
71
        self.config = config
1✔
72
        self.freqai_config: Dict[str, Any] = config["freqai"]
1✔
73
        self.full_df: DataFrame = DataFrame()
1✔
74
        self.append_df: DataFrame = DataFrame()
1✔
75
        self.data_path = Path()
1✔
76
        self.label_list: List = []
1✔
77
        self.training_features_list: List = []
1✔
78
        self.model_filename: str = ""
1✔
79
        self.backtesting_results_path = Path()
1✔
80
        self.backtest_predictions_folder: str = "backtesting_predictions"
1✔
81
        self.live = live
1✔
82
        self.pair = pair
1✔
83

84
        self.svm_model: linear_model.SGDOneClassSVM = None
1✔
85
        self.keras: bool = self.freqai_config.get("keras", False)
1✔
86
        self.set_all_pairs()
1✔
87
        self.backtest_live_models = config.get("freqai_backtest_live_models", False)
1✔
88

89
        if not self.live:
1✔
90
            self.full_path = self.get_full_models_path(self.config)
1✔
91

92
            if not self.backtest_live_models:
1✔
93
                self.full_timerange = self.create_fulltimerange(
1✔
94
                    self.config["timerange"], self.freqai_config.get("train_period_days", 0)
95
                )
96
                (self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
1✔
97
                    self.full_timerange,
98
                    config["freqai"]["train_period_days"],
99
                    config["freqai"]["backtest_period_days"],
100
                )
101

102
        self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
1✔
103
        if not self.freqai_config.get("data_kitchen_thread_count", 0):
1✔
104
            self.thread_count = max(int(psutil.cpu_count() * 2 - 2), 1)
×
105
        else:
106
            self.thread_count = self.freqai_config["data_kitchen_thread_count"]
1✔
107
        self.train_dates: DataFrame = pd.DataFrame()
1✔
108
        self.unique_classes: Dict[str, list] = {}
1✔
109
        self.unique_class_list: list = []
1✔
110
        self.backtest_live_models_data: Dict[str, Any] = {}
1✔
111

112
    def set_paths(
1✔
113
        self,
114
        pair: str,
115
        trained_timestamp: Optional[int] = None,
116
    ) -> None:
117
        """
118
        Set the paths to the data for the present coin/botloop
119
        :param metadata: dict = strategy furnished pair metadata
120
        :param trained_timestamp: int = timestamp of most recent training
121
        """
122
        self.full_path = self.get_full_models_path(self.config)
1✔
123
        self.data_path = Path(
1✔
124
            self.full_path
125
            / f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
126
        )
127

128
        return
1✔
129

130
    def make_train_test_datasets(
1✔
131
        self, filtered_dataframe: DataFrame, labels: DataFrame
132
    ) -> Dict[Any, Any]:
133
        """
134
        Given the dataframe for the full history for training, split the data into
135
        training and test data according to user specified parameters in configuration
136
        file.
137
        :param filtered_dataframe: cleaned dataframe ready to be split.
138
        :param labels: cleaned labels ready to be split.
139
        """
140
        feat_dict = self.freqai_config["feature_parameters"]
1✔
141

142
        if 'shuffle' not in self.freqai_config['data_split_parameters']:
1✔
143
            self.freqai_config["data_split_parameters"].update({'shuffle': False})
×
144

145
        weights: npt.ArrayLike
146
        if feat_dict.get("weight_factor", 0) > 0:
1✔
147
            weights = self.set_weights_higher_recent(len(filtered_dataframe))
1✔
148
        else:
149
            weights = np.ones(len(filtered_dataframe))
×
150

151
        if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
152
            (
1✔
153
                train_features,
154
                test_features,
155
                train_labels,
156
                test_labels,
157
                train_weights,
158
                test_weights,
159
            ) = train_test_split(
160
                filtered_dataframe[: filtered_dataframe.shape[0]],
161
                labels,
162
                weights,
163
                **self.config["freqai"]["data_split_parameters"],
164
            )
165
        else:
166
            test_labels = np.zeros(2)
×
167
            test_features = pd.DataFrame()
×
168
            test_weights = np.zeros(2)
×
169
            train_features = filtered_dataframe
×
170
            train_labels = labels
×
171
            train_weights = weights
×
172

173
        # Simplest way to reverse the order of training and test data:
174
        if self.freqai_config['feature_parameters'].get('reverse_train_test_order', False):
1✔
175
            return self.build_data_dictionary(
×
176
                test_features, train_features, test_labels,
177
                train_labels, test_weights, train_weights
178
                )
179
        else:
180
            return self.build_data_dictionary(
1✔
181
                train_features, test_features, train_labels,
182
                test_labels, train_weights, test_weights
183
            )
184

185
    def filter_features(
1✔
186
        self,
187
        unfiltered_df: DataFrame,
188
        training_feature_list: List,
189
        label_list: List = list(),
190
        training_filter: bool = True,
191
    ) -> Tuple[DataFrame, DataFrame]:
192
        """
193
        Filter the unfiltered dataframe to extract the user requested features/labels and properly
194
        remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
195
        0s in the prediction dataset. However, prediction dataset do_predict will reflect any
196
        row that had a NaN and will shield user from that prediction.
197

198
        :param unfiltered_df: the full dataframe for the present training period
199
        :param training_feature_list: list, the training feature list constructed by
200
                                      self.build_feature_list() according to user specified
201
                                      parameters in the configuration file.
202
        :param labels: the labels for the dataset
203
        :param training_filter: boolean which lets the function know if it is training data or
204
                                prediction data to be filtered.
205
        :returns:
206
        :filtered_df: dataframe cleaned of NaNs and only containing the user
207
        requested feature set.
208
        :labels: labels cleaned of NaNs.
209
        """
210
        filtered_df = unfiltered_df.filter(training_feature_list, axis=1)
1✔
211
        filtered_df = filtered_df.replace([np.inf, -np.inf], np.nan)
1✔
212

213
        drop_index = pd.isnull(filtered_df).any(axis=1)  # get the rows that have NaNs,
1✔
214
        drop_index = drop_index.replace(True, 1).replace(False, 0)  # pep8 requirement.
1✔
215
        if (training_filter):
1✔
216
            const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
1✔
217
            if const_cols:
1✔
218
                filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
1✔
219
                self.data['constant_features_list'] = const_cols
1✔
220
                logger.warning(f"Removed features {const_cols} with constant values.")
1✔
221
            else:
222
                self.data['constant_features_list'] = []
1✔
223
            # we don't care about total row number (total no. datapoints) in training, we only care
224
            # about removing any row with NaNs
225
            # if labels has multiple columns (user wants to train multiple modelEs), we detect here
226
            labels = unfiltered_df.filter(label_list, axis=1)
1✔
227
            drop_index_labels = pd.isnull(labels).any(axis=1)
1✔
228
            drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0)
1✔
229
            dates = unfiltered_df['date']
1✔
230
            filtered_df = filtered_df[
1✔
231
                (drop_index == 0) & (drop_index_labels == 0)
232
            ]  # dropping values
233
            labels = labels[
1✔
234
                (drop_index == 0) & (drop_index_labels == 0)
235
            ]  # assuming the labels depend entirely on the dataframe here.
236
            self.train_dates = dates[
1✔
237
                (drop_index == 0) & (drop_index_labels == 0)
238
            ]
239
            logger.info(
1✔
240
                f"dropped {len(unfiltered_df) - len(filtered_df)} training points"
241
                f" due to NaNs in populated dataset {len(unfiltered_df)}."
242
            )
243
            if (1 - len(filtered_df) / len(unfiltered_df)) > 0.1 and self.live:
1✔
244
                worst_indicator = str(unfiltered_df.count().idxmin())
×
245
                logger.warning(
×
246
                    f" {(1 - len(filtered_df)/len(unfiltered_df)) * 100:.0f} percent "
247
                    " of training data dropped due to NaNs, model may perform inconsistent "
248
                    f"with expectations. Verify {worst_indicator}"
249
                )
250
            self.data["filter_drop_index_training"] = drop_index
1✔
251

252
        else:
253
            if 'constant_features_list' in self.data and len(self.data['constant_features_list']):
1✔
254
                filtered_df = self.check_pred_labels(filtered_df)
1✔
255
            # we are backtesting so we need to preserve row number to send back to strategy,
256
            # so now we use do_predict to avoid any prediction based on a NaN
257
            drop_index = pd.isnull(filtered_df).any(axis=1)
1✔
258
            self.data["filter_drop_index_prediction"] = drop_index
1✔
259
            filtered_df.fillna(0, inplace=True)
1✔
260
            # replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction
261
            # that was based on a single NaN is ultimately protected from buys with do_predict
262
            drop_index = ~drop_index
1✔
263
            self.do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
1✔
264
            if (len(self.do_predict) - self.do_predict.sum()) > 0:
1✔
265
                logger.info(
1✔
266
                    "dropped %s of %s prediction data points due to NaNs.",
267
                    len(self.do_predict) - self.do_predict.sum(),
268
                    len(filtered_df),
269
                )
270
            labels = []
1✔
271

272
        return filtered_df, labels
1✔
273

274
    def build_data_dictionary(
1✔
275
        self,
276
        train_df: DataFrame,
277
        test_df: DataFrame,
278
        train_labels: DataFrame,
279
        test_labels: DataFrame,
280
        train_weights: Any,
281
        test_weights: Any,
282
    ) -> Dict:
283

284
        self.data_dictionary = {
1✔
285
            "train_features": train_df,
286
            "test_features": test_df,
287
            "train_labels": train_labels,
288
            "test_labels": test_labels,
289
            "train_weights": train_weights,
290
            "test_weights": test_weights,
291
            "train_dates": self.train_dates
292
        }
293

294
        return self.data_dictionary
1✔
295

296
    def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
1✔
297
        """
298
        Normalize all data in the data_dictionary according to the training dataset
299
        :param data_dictionary: dictionary containing the cleaned and
300
                                split training/test data/labels
301
        :returns:
302
        :data_dictionary: updated dictionary with standardized values.
303
        """
304

305
        # standardize the data by training stats
306
        train_max = data_dictionary["train_features"].max()
1✔
307
        train_min = data_dictionary["train_features"].min()
1✔
308
        data_dictionary["train_features"] = (
1✔
309
            2 * (data_dictionary["train_features"] - train_min) / (train_max - train_min) - 1
310
        )
311
        data_dictionary["test_features"] = (
1✔
312
            2 * (data_dictionary["test_features"] - train_min) / (train_max - train_min) - 1
313
        )
314

315
        for item in train_max.keys():
1✔
316
            self.data[item + "_max"] = train_max[item]
1✔
317
            self.data[item + "_min"] = train_min[item]
1✔
318

319
        for item in data_dictionary["train_labels"].keys():
1✔
320
            if data_dictionary["train_labels"][item].dtype == object:
1✔
321
                continue
1✔
322
            train_labels_max = data_dictionary["train_labels"][item].max()
1✔
323
            train_labels_min = data_dictionary["train_labels"][item].min()
1✔
324
            data_dictionary["train_labels"][item] = (
1✔
325
                2
326
                * (data_dictionary["train_labels"][item] - train_labels_min)
327
                / (train_labels_max - train_labels_min)
328
                - 1
329
            )
330
            if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
331
                data_dictionary["test_labels"][item] = (
1✔
332
                    2
333
                    * (data_dictionary["test_labels"][item] - train_labels_min)
334
                    / (train_labels_max - train_labels_min)
335
                    - 1
336
                )
337

338
            self.data[f"{item}_max"] = train_labels_max
1✔
339
            self.data[f"{item}_min"] = train_labels_min
1✔
340
        return data_dictionary
1✔
341

342
    def normalize_single_dataframe(self, df: DataFrame) -> DataFrame:
1✔
343

344
        train_max = df.max()
1✔
345
        train_min = df.min()
1✔
346
        df = (
1✔
347
            2 * (df - train_min) / (train_max - train_min) - 1
348
        )
349

350
        for item in train_max.keys():
1✔
351
            self.data[item + "_max"] = train_max[item]
1✔
352
            self.data[item + "_min"] = train_min[item]
1✔
353

354
        return df
1✔
355

356
    def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame:
1✔
357
        """
358
        Normalize a set of data using the mean and standard deviation from
359
        the associated training data.
360
        :param df: Dataframe to be standardized
361
        """
362

363
        train_max = [None] * len(df.keys())
1✔
364
        train_min = [None] * len(df.keys())
1✔
365

366
        for i, item in enumerate(df.keys()):
1✔
367
            train_max[i] = self.data[f"{item}_max"]
1✔
368
            train_min[i] = self.data[f"{item}_min"]
1✔
369

370
        train_max_series = pd.Series(train_max, index=df.keys())
1✔
371
        train_min_series = pd.Series(train_min, index=df.keys())
1✔
372

373
        df = (
1✔
374
            2 * (df - train_min_series) / (train_max_series - train_min_series) - 1
375
        )
376

377
        return df
1✔
378

379
    def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame:
1✔
380
        """
381
        Denormalize a set of data using the mean and standard deviation from
382
        the associated training data.
383
        :param df: Dataframe of predictions to be denormalized
384
        """
385

386
        for label in df.columns:
1✔
387
            if df[label].dtype == object or label in self.unique_class_list:
1✔
388
                continue
×
389
            df[label] = (
1✔
390
                (df[label] + 1)
391
                * (self.data[f"{label}_max"] - self.data[f"{label}_min"])
392
                / 2
393
            ) + self.data[f"{label}_min"]
394

395
        return df
1✔
396

397
    def split_timerange(
1✔
398
        self, tr: str, train_split: int = 28, bt_split: float = 7
399
    ) -> Tuple[list, list]:
400
        """
401
        Function which takes a single time range (tr) and splits it
402
        into sub timeranges to train and backtest on based on user input
403
        tr: str, full timerange to train on
404
        train_split: the period length for the each training (days). Specified in user
405
        configuration file
406
        bt_split: the backtesting length (days). Specified in user configuration file
407
        """
408

409
        if not isinstance(train_split, int) or train_split < 1:
1✔
410
            raise OperationalException(
1✔
411
                f"train_period_days must be an integer greater than 0. Got {train_split}."
412
            )
413
        train_period_days = train_split * SECONDS_IN_DAY
1✔
414
        bt_period = bt_split * SECONDS_IN_DAY
1✔
415

416
        full_timerange = TimeRange.parse_timerange(tr)
1✔
417
        config_timerange = TimeRange.parse_timerange(self.config["timerange"])
1✔
418
        if config_timerange.stopts == 0:
1✔
419
            config_timerange.stopts = int(
×
420
                datetime.now(tz=timezone.utc).timestamp()
421
            )
422
        timerange_train = copy.deepcopy(full_timerange)
1✔
423
        timerange_backtest = copy.deepcopy(full_timerange)
1✔
424

425
        tr_training_list = []
1✔
426
        tr_backtesting_list = []
1✔
427
        tr_training_list_timerange = []
1✔
428
        tr_backtesting_list_timerange = []
1✔
429
        first = True
1✔
430

431
        while True:
1✔
432
            if not first:
1✔
433
                timerange_train.startts = timerange_train.startts + int(bt_period)
1✔
434
            timerange_train.stopts = timerange_train.startts + train_period_days
1✔
435

436
            first = False
1✔
437
            tr_training_list.append(timerange_train.timerange_str)
1✔
438
            tr_training_list_timerange.append(copy.deepcopy(timerange_train))
1✔
439

440
            # associated backtest period
441

442
            timerange_backtest.startts = timerange_train.stopts
1✔
443

444
            timerange_backtest.stopts = timerange_backtest.startts + int(bt_period)
1✔
445

446
            if timerange_backtest.stopts > config_timerange.stopts:
1✔
447
                timerange_backtest.stopts = config_timerange.stopts
1✔
448

449
            tr_backtesting_list.append(timerange_backtest.timerange_str)
1✔
450
            tr_backtesting_list_timerange.append(copy.deepcopy(timerange_backtest))
1✔
451

452
            # ensure we are predicting on exactly same amount of data as requested by user defined
453
            #  --timerange
454
            if timerange_backtest.stopts == config_timerange.stopts:
1✔
455
                break
1✔
456

457
        # print(tr_training_list, tr_backtesting_list)
458
        return tr_training_list_timerange, tr_backtesting_list_timerange
1✔
459

460
    def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
1✔
461
        """
462
        Given a full dataframe, extract the user desired window
463
        :param tr: timerange string that we wish to extract from df
464
        :param df: Dataframe containing all candles to run the entire backtest. Here
465
                   it is sliced down to just the present training period.
466
        """
467
        if not self.live:
1✔
468
            df = df.loc[(df["date"] >= timerange.startdt) & (df["date"] < timerange.stopdt), :]
1✔
469
        else:
470
            df = df.loc[df["date"] >= timerange.startdt, :]
×
471

472
        return df
1✔
473

474
    def check_pred_labels(self, df_predictions: DataFrame) -> DataFrame:
1✔
475
        """
476
        Check that prediction feature labels match training feature labels.
477
        :param df_predictions: incoming predictions
478
        """
479
        constant_labels = self.data['constant_features_list']
1✔
480
        df_predictions = df_predictions.filter(
1✔
481
            df_predictions.columns.difference(constant_labels)
482
        )
483
        logger.warning(
1✔
484
            f"Removed {len(constant_labels)} features from prediction features, "
485
            f"these were considered constant values during most recent training."
486
        )
487

488
        return df_predictions
1✔
489

490
    def principal_component_analysis(self) -> None:
1✔
491
        """
492
        Performs Principal Component Analysis on the data for dimensionality reduction
493
        and outlier detection (see self.remove_outliers())
494
        No parameters or returns, it acts on the data_dictionary held by the DataHandler.
495
        """
496

497
        from sklearn.decomposition import PCA  # avoid importing if we dont need it
1✔
498

499
        pca = PCA(0.999)
1✔
500
        pca = pca.fit(self.data_dictionary["train_features"])
1✔
501
        n_keep_components = pca.n_components_
1✔
502
        self.data["n_kept_components"] = n_keep_components
1✔
503
        n_components = self.data_dictionary["train_features"].shape[1]
1✔
504
        logger.info("reduced feature dimension by %s", n_components - n_keep_components)
1✔
505
        logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_))
1✔
506

507
        train_components = pca.transform(self.data_dictionary["train_features"])
1✔
508
        self.data_dictionary["train_features"] = pd.DataFrame(
1✔
509
            data=train_components,
510
            columns=["PC" + str(i) for i in range(0, n_keep_components)],
511
            index=self.data_dictionary["train_features"].index,
512
        )
513
        # normalsing transformed training features
514
        self.data_dictionary["train_features"] = self.normalize_single_dataframe(
1✔
515
            self.data_dictionary["train_features"])
516

517
        # keeping a copy of the non-transformed features so we can check for errors during
518
        # model load from disk
519
        self.data["training_features_list_raw"] = copy.deepcopy(self.training_features_list)
1✔
520
        self.training_features_list = self.data_dictionary["train_features"].columns
1✔
521

522
        if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
523
            test_components = pca.transform(self.data_dictionary["test_features"])
1✔
524
            self.data_dictionary["test_features"] = pd.DataFrame(
1✔
525
                data=test_components,
526
                columns=["PC" + str(i) for i in range(0, n_keep_components)],
527
                index=self.data_dictionary["test_features"].index,
528
            )
529
            # normalise transformed test feature to transformed training features
530
            self.data_dictionary["test_features"] = self.normalize_data_from_metadata(
1✔
531
                self.data_dictionary["test_features"])
532

533
        self.data["n_kept_components"] = n_keep_components
1✔
534
        self.pca = pca
1✔
535

536
        logger.info(f"PCA reduced total features from  {n_components} to {n_keep_components}")
1✔
537

538
        if not self.data_path.is_dir():
1✔
539
            self.data_path.mkdir(parents=True, exist_ok=True)
1✔
540

541
        return None
1✔
542

543
    def pca_transform(self, filtered_dataframe: DataFrame) -> None:
1✔
544
        """
545
        Use an existing pca transform to transform data into components
546
        :param filtered_dataframe: DataFrame = the cleaned dataframe
547
        """
548
        pca_components = self.pca.transform(filtered_dataframe)
×
549
        self.data_dictionary["prediction_features"] = pd.DataFrame(
×
550
            data=pca_components,
551
            columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])],
552
            index=filtered_dataframe.index,
553
        )
554
        # normalise transformed predictions to transformed training features
555
        self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata(
×
556
            self.data_dictionary["prediction_features"])
557

558
    def compute_distances(self) -> float:
1✔
559
        """
560
        Compute distances between each training point and every other training
561
        point. This metric defines the neighborhood of trained data and is used
562
        for prediction confidence in the Dissimilarity Index
563
        """
564
        # logger.info("computing average mean distance for all training points")
565
        pairwise = pairwise_distances(
1✔
566
            self.data_dictionary["train_features"], n_jobs=self.thread_count)
567
        # remove the diagonal distances which are itself distances ~0
568
        np.fill_diagonal(pairwise, np.NaN)
1✔
569
        pairwise = pairwise.reshape(-1, 1)
1✔
570
        avg_mean_dist = pairwise[~np.isnan(pairwise)].mean()
1✔
571

572
        return avg_mean_dist
1✔
573

574
    def get_outlier_percentage(self, dropped_pts: npt.NDArray) -> float:
1✔
575
        """
576
        Check if more than X% of points werer dropped during outlier detection.
577
        """
578
        outlier_protection_pct = self.freqai_config["feature_parameters"].get(
1✔
579
            "outlier_protection_percentage", 30)
580
        outlier_pct = (dropped_pts.sum() / len(dropped_pts)) * 100
1✔
581
        if outlier_pct >= outlier_protection_pct:
1✔
582
            return outlier_pct
1✔
583
        else:
584
            return 0.0
1✔
585

586
    def use_SVM_to_remove_outliers(self, predict: bool) -> None:
1✔
587
        """
588
        Build/inference a Support Vector Machine to detect outliers
589
        in training data and prediction
590
        :param predict: bool = If true, inference an existing SVM model, else construct one
591
        """
592

593
        if self.keras:
1✔
594
            logger.warning(
×
595
                "SVM outlier removal not currently supported for Keras based models. "
596
                "Skipping user requested function."
597
            )
598
            if predict:
×
599
                self.do_predict = np.ones(len(self.data_dictionary["prediction_features"]))
×
600
            return
×
601

602
        if predict:
1✔
603
            if not self.svm_model:
1✔
604
                logger.warning("No svm model available for outlier removal")
×
605
                return
×
606
            y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
1✔
607
            do_predict = np.where(y_pred == -1, 0, y_pred)
1✔
608

609
            if (len(do_predict) - do_predict.sum()) > 0:
1✔
610
                logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.")
1✔
611
            self.do_predict += do_predict
1✔
612
            self.do_predict -= 1
1✔
613

614
        else:
615
            # use SGDOneClassSVM to increase speed?
616
            svm_params = self.freqai_config["feature_parameters"].get(
1✔
617
                "svm_params", {"shuffle": False, "nu": 0.1})
618
            self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit(
1✔
619
                self.data_dictionary["train_features"]
620
            )
621
            y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
1✔
622
            kept_points = np.where(y_pred == -1, 0, y_pred)
1✔
623
            # keep_index = np.where(y_pred == 1)
624
            outlier_pct = self.get_outlier_percentage(1 - kept_points)
1✔
625
            if outlier_pct:
1✔
626
                logger.warning(
1✔
627
                        f"SVM detected {outlier_pct:.2f}% of the points as outliers. "
628
                        f"Keeping original dataset."
629
                )
630
                self.svm_model = None
1✔
631
                return
1✔
632

633
            self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
1✔
634
                (y_pred == 1)
635
            ]
636
            self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
1✔
637
                (y_pred == 1)
638
            ]
639
            self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
1✔
640
                (y_pred == 1)
641
            ]
642

643
            logger.info(
1✔
644
                f"SVM tossed {len(y_pred) - kept_points.sum()}"
645
                f" train points from {len(y_pred)} total points."
646
            )
647

648
            # same for test data
649
            # TODO: This (and the part above) could be refactored into a separate function
650
            # to reduce code duplication
651
            if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0:
1✔
652
                y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
1✔
653
                kept_points = np.where(y_pred == -1, 0, y_pred)
1✔
654
                self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
1✔
655
                    (y_pred == 1)
656
                ]
657
                self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][(
1✔
658
                    y_pred == 1)]
659
                self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
1✔
660
                    (y_pred == 1)
661
                ]
662

663
            logger.info(
1✔
664
                f"SVM tossed {len(y_pred) - kept_points.sum()}"
665
                f" test points from {len(y_pred)} total points."
666
            )
667

668
        return
1✔
669

670
    def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None:
1✔
671
        """
672
        Use DBSCAN to cluster training data and remove "noisy" data (read outliers).
673
        User controls this via the config param `DBSCAN_outlier_pct` which indicates the
674
        pct of training data that they want to be considered outliers.
675
        :param predict: bool = If False (training), iterate to find the best hyper parameters
676
                        to match user requested outlier percent target.
677
                        If True (prediction), use the parameters determined from
678
                        the previous training to estimate if the current prediction point
679
                        is an outlier.
680
        """
681

682
        if predict:
1✔
683
            if not self.data['DBSCAN_eps']:
×
684
                return
×
685
            train_ft_df = self.data_dictionary['train_features']
×
686
            pred_ft_df = self.data_dictionary['prediction_features']
×
687
            num_preds = len(pred_ft_df)
×
688
            df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True)
×
689
            clustering = DBSCAN(eps=self.data['DBSCAN_eps'],
×
690
                                min_samples=self.data['DBSCAN_min_samples'],
691
                                n_jobs=self.thread_count
692
                                ).fit(df)
693
            do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1)
×
694

695
            if (len(do_predict) - do_predict.sum()) > 0:
×
696
                logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions")
×
697
            self.do_predict += do_predict
×
698
            self.do_predict -= 1
×
699

700
        else:
701

702
            def normalise_distances(distances):
1✔
703
                normalised_distances = (distances - distances.min()) / \
1✔
704
                                        (distances.max() - distances.min())
705
                return normalised_distances
1✔
706

707
            def rotate_point(origin, point, angle):
1✔
708
                # rotate a point counterclockwise by a given angle (in radians)
709
                # around a given origin
710
                x = origin[0] + cos(angle) * (point[0] - origin[0]) - \
1✔
711
                                    sin(angle) * (point[1] - origin[1])
712
                y = origin[1] + sin(angle) * (point[0] - origin[0]) + \
1✔
713
                    cos(angle) * (point[1] - origin[1])
714
                return (x, y)
1✔
715

716
            MinPts = int(len(self.data_dictionary['train_features'].index) * 0.25)
1✔
717
            # measure pairwise distances to nearest neighbours
718
            neighbors = NearestNeighbors(
1✔
719
                n_neighbors=MinPts, n_jobs=self.thread_count)
720
            neighbors_fit = neighbors.fit(self.data_dictionary['train_features'])
1✔
721
            distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features'])
1✔
722
            distances = np.sort(distances, axis=0).mean(axis=1)
1✔
723

724
            normalised_distances = normalise_distances(distances)
1✔
725
            x_range = np.linspace(0, 1, len(distances))
1✔
726
            line = np.linspace(normalised_distances[0],
1✔
727
                               normalised_distances[-1], len(normalised_distances))
728
            deflection = np.abs(normalised_distances - line)
1✔
729
            max_deflection_loc = np.where(deflection == deflection.max())[0][0]
1✔
730
            origin = x_range[max_deflection_loc], line[max_deflection_loc]
1✔
731
            point = x_range[max_deflection_loc], normalised_distances[max_deflection_loc]
1✔
732
            rot_angle = np.pi / 4
1✔
733
            elbow_loc = rotate_point(origin, point, rot_angle)
1✔
734

735
            epsilon = elbow_loc[1] * (distances[-1] - distances[0]) + distances[0]
1✔
736

737
            clustering = DBSCAN(eps=epsilon, min_samples=MinPts,
1✔
738
                                n_jobs=int(self.thread_count)).fit(
739
                                                    self.data_dictionary['train_features']
740
                                                )
741

742
            logger.info(f'DBSCAN found eps of {epsilon:.2f}.')
1✔
743

744
            self.data['DBSCAN_eps'] = epsilon
1✔
745
            self.data['DBSCAN_min_samples'] = MinPts
1✔
746
            dropped_points = np.where(clustering.labels_ == -1, 1, 0)
1✔
747

748
            outlier_pct = self.get_outlier_percentage(dropped_points)
1✔
749
            if outlier_pct:
1✔
750
                logger.warning(
×
751
                        f"DBSCAN detected {outlier_pct:.2f}% of the points as outliers. "
752
                        f"Keeping original dataset."
753
                )
754
                self.data['DBSCAN_eps'] = 0
×
755
                return
×
756

757
            self.data_dictionary['train_features'] = self.data_dictionary['train_features'][
1✔
758
                (clustering.labels_ != -1)
759
            ]
760
            self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
1✔
761
                (clustering.labels_ != -1)
762
            ]
763
            self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
1✔
764
                (clustering.labels_ != -1)
765
            ]
766

767
            logger.info(
1✔
768
                f"DBSCAN tossed {dropped_points.sum()}"
769
                f" train points from {len(clustering.labels_)}"
770
            )
771

772
        return
1✔
773

774
    def compute_inlier_metric(self, set_='train') -> None:
1✔
775
        """
776
        Compute inlier metric from backwards distance distributions.
777
        This metric defines how well features from a timepoint fit
778
        into previous timepoints.
779
        """
780

781
        def normalise(dataframe: DataFrame, key: str) -> DataFrame:
1✔
782
            if set_ == 'train':
1✔
783
                min_value = dataframe.min()
1✔
784
                max_value = dataframe.max()
1✔
785
                self.data[f'{key}_min'] = min_value
1✔
786
                self.data[f'{key}_max'] = max_value
1✔
787
            else:
788
                min_value = self.data[f'{key}_min']
×
789
                max_value = self.data[f'{key}_max']
×
790
            return (dataframe - min_value) / (max_value - min_value)
1✔
791

792
        no_prev_pts = self.freqai_config["feature_parameters"]["inlier_metric_window"]
1✔
793

794
        if set_ == 'train':
1✔
795
            compute_df = copy.deepcopy(self.data_dictionary['train_features'])
1✔
796
        elif set_ == 'test':
×
797
            compute_df = copy.deepcopy(self.data_dictionary['test_features'])
×
798
        else:
799
            compute_df = copy.deepcopy(self.data_dictionary['prediction_features'])
×
800

801
        compute_df_reindexed = compute_df.reindex(
1✔
802
            index=np.flip(compute_df.index)
803
        )
804

805
        pairwise = pd.DataFrame(
1✔
806
            np.triu(
807
                pairwise_distances(compute_df_reindexed, n_jobs=self.thread_count)
808
            ),
809
            columns=compute_df_reindexed.index,
810
            index=compute_df_reindexed.index
811
        )
812
        pairwise = pairwise.round(5)
1✔
813

814
        column_labels = [
1✔
815
            '{}{}'.format('d', i) for i in range(1, no_prev_pts + 1)
816
        ]
817
        distances = pd.DataFrame(
1✔
818
            columns=column_labels, index=compute_df.index
819
        )
820

821
        for index in compute_df.index[no_prev_pts:]:
1✔
822
            current_row = pairwise.loc[[index]]
1✔
823
            current_row_no_zeros = current_row.loc[
1✔
824
                :, (current_row != 0).any(axis=0)
825
            ]
826
            distances.loc[[index]] = current_row_no_zeros.iloc[
1✔
827
                :, :no_prev_pts
828
            ]
829
        distances = distances.replace([np.inf, -np.inf], np.nan)
1✔
830
        drop_index = pd.isnull(distances).any(axis=1)
1✔
831
        distances = distances[drop_index == 0]
1✔
832

833
        inliers = pd.DataFrame(index=distances.index)
1✔
834
        for key in distances.keys():
1✔
835
            current_distances = distances[key].dropna()
1✔
836
            current_distances = normalise(current_distances, key)
1✔
837
            if set_ == 'train':
1✔
838
                fit_params = stats.weibull_min.fit(current_distances)
1✔
839
                self.data[f'{key}_fit_params'] = fit_params
1✔
840
            else:
841
                fit_params = self.data[f'{key}_fit_params']
×
842
            quantiles = stats.weibull_min.cdf(current_distances, *fit_params)
1✔
843

844
            df_inlier = pd.DataFrame(
1✔
845
                {key: quantiles}, index=distances.index
846
            )
847
            inliers = pd.concat(
1✔
848
                [inliers, df_inlier], axis=1
849
            )
850

851
        inlier_metric = pd.DataFrame(
1✔
852
            data=inliers.sum(axis=1) / no_prev_pts,
853
            columns=['%-inlier_metric'],
854
            index=compute_df.index
855
        )
856

857
        inlier_metric = (2 * (inlier_metric - inlier_metric.min()) /
1✔
858
                         (inlier_metric.max() - inlier_metric.min()) - 1)
859

860
        if set_ in ('train', 'test'):
1✔
861
            inlier_metric = inlier_metric.iloc[no_prev_pts:]
1✔
862
            compute_df = compute_df.iloc[no_prev_pts:]
1✔
863
            self.remove_beginning_points_from_data_dict(set_, no_prev_pts)
1✔
864
            self.data_dictionary[f'{set_}_features'] = pd.concat(
1✔
865
                [compute_df, inlier_metric], axis=1)
866
        else:
867
            self.data_dictionary['prediction_features'] = pd.concat(
×
868
                [compute_df, inlier_metric], axis=1)
869
            self.data_dictionary['prediction_features'].fillna(0, inplace=True)
×
870

871
        logger.info('Inlier metric computed and added to features.')
1✔
872

873
        return None
1✔
874

875
    def remove_beginning_points_from_data_dict(self, set_='train', no_prev_pts: int = 10):
1✔
876
        features = self.data_dictionary[f'{set_}_features']
1✔
877
        weights = self.data_dictionary[f'{set_}_weights']
1✔
878
        labels = self.data_dictionary[f'{set_}_labels']
1✔
879
        self.data_dictionary[f'{set_}_weights'] = weights[no_prev_pts:]
1✔
880
        self.data_dictionary[f'{set_}_features'] = features.iloc[no_prev_pts:]
1✔
881
        self.data_dictionary[f'{set_}_labels'] = labels.iloc[no_prev_pts:]
1✔
882

883
    def add_noise_to_training_features(self) -> None:
1✔
884
        """
885
        Add noise to train features to reduce the risk of overfitting.
886
        """
887
        mu = 0  # no shift
1✔
888
        sigma = self.freqai_config["feature_parameters"]["noise_standard_deviation"]
1✔
889
        compute_df = self.data_dictionary['train_features']
1✔
890
        noise = np.random.normal(mu, sigma, [compute_df.shape[0], compute_df.shape[1]])
1✔
891
        self.data_dictionary['train_features'] += noise
1✔
892
        return
1✔
893

894
    def find_features(self, dataframe: DataFrame) -> None:
1✔
895
        """
896
        Find features in the strategy provided dataframe
897
        :param dataframe: DataFrame = strategy provided dataframe
898
        :return:
899
        features: list = the features to be used for training/prediction
900
        """
901
        column_names = dataframe.columns
1✔
902
        features = [c for c in column_names if "%" in c]
1✔
903

904
        if not features:
1✔
905
            raise OperationalException("Could not find any features!")
×
906

907
        self.training_features_list = features
1✔
908

909
    def find_labels(self, dataframe: DataFrame) -> None:
1✔
910
        column_names = dataframe.columns
1✔
911
        labels = [c for c in column_names if "&" in c]
1✔
912
        self.label_list = labels
1✔
913

914
    def check_if_pred_in_training_spaces(self) -> None:
1✔
915
        """
916
        Compares the distance from each prediction point to each training data
917
        point. It uses this information to estimate a Dissimilarity Index (DI)
918
        and avoid making predictions on any points that are too far away
919
        from the training data set.
920
        """
921

922
        distance = pairwise_distances(
1✔
923
            self.data_dictionary["train_features"],
924
            self.data_dictionary["prediction_features"],
925
            n_jobs=self.thread_count,
926
        )
927

928
        self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
1✔
929

930
        do_predict = np.where(
1✔
931
            self.DI_values < self.freqai_config["feature_parameters"]["DI_threshold"],
932
            1,
933
            0,
934
        )
935

936
        if (len(do_predict) - do_predict.sum()) > 0:
1✔
937
            logger.info(
1✔
938
                f"DI tossed {len(do_predict) - do_predict.sum()} predictions for "
939
                "being too far from training data."
940
            )
941

942
        self.do_predict += do_predict
1✔
943
        self.do_predict -= 1
1✔
944

945
    def set_weights_higher_recent(self, num_weights: int) -> npt.ArrayLike:
1✔
946
        """
947
        Set weights so that recent data is more heavily weighted during
948
        training than older data.
949
        """
950
        wfactor = self.config["freqai"]["feature_parameters"]["weight_factor"]
1✔
951
        weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
1✔
952
        return weights
1✔
953

954
    def get_predictions_to_append(self, predictions: DataFrame,
1✔
955
                                  do_predict: npt.ArrayLike,
956
                                  dataframe_backtest: DataFrame) -> DataFrame:
957
        """
958
        Get backtest prediction from current backtest period
959
        """
960

961
        append_df = DataFrame()
1✔
962
        for label in predictions.columns:
1✔
963
            append_df[label] = predictions[label]
1✔
964
            if append_df[label].dtype == object:
1✔
965
                continue
1✔
966
            if "labels_mean" in self.data:
1✔
967
                append_df[f"{label}_mean"] = self.data["labels_mean"][label]
1✔
968
            if "labels_std" in self.data:
1✔
969
                append_df[f"{label}_std"] = self.data["labels_std"][label]
1✔
970

971
        for extra_col in self.data["extra_returns_per_train"]:
1✔
972
            append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
×
973

974
        append_df["do_predict"] = do_predict
1✔
975
        if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
1✔
976
            append_df["DI_values"] = self.DI_values
1✔
977

978
        dataframe_backtest.reset_index(drop=True, inplace=True)
1✔
979
        merged_df = pd.concat([dataframe_backtest["date"], append_df], axis=1)
1✔
980
        return merged_df
1✔
981

982
    def append_predictions(self, append_df: DataFrame) -> None:
1✔
983
        """
984
        Append backtest prediction from current backtest period to all previous periods
985
        """
986

987
        if self.full_df.empty:
1✔
988
            self.full_df = append_df
1✔
989
        else:
990
            self.full_df = pd.concat([self.full_df, append_df], axis=0, ignore_index=True)
1✔
991

992
    def fill_predictions(self, dataframe):
1✔
993
        """
994
        Back fill values to before the backtesting range so that the dataframe matches size
995
        when it goes back to the strategy. These rows are not included in the backtest.
996
        """
997
        to_keep = [col for col in dataframe.columns if not col.startswith("&")]
1✔
998
        self.return_dataframe = pd.merge(dataframe[to_keep],
1✔
999
                                         self.full_df, how='left', on='date')
1000
        self.return_dataframe[self.full_df.columns] = (
1✔
1001
            self.return_dataframe[self.full_df.columns].fillna(value=0))
1002
        self.full_df = DataFrame()
1✔
1003

1004
        return
1✔
1005

1006
    def create_fulltimerange(self, backtest_tr: str, backtest_period_days: int) -> str:
1✔
1007

1008
        if not isinstance(backtest_period_days, int):
1✔
1009
            raise OperationalException("backtest_period_days must be an integer")
1✔
1010

1011
        if backtest_period_days < 0:
1✔
1012
            raise OperationalException("backtest_period_days must be positive")
1✔
1013

1014
        backtest_timerange = TimeRange.parse_timerange(backtest_tr)
1✔
1015

1016
        if backtest_timerange.stopts == 0:
1✔
1017
            # typically open ended time ranges do work, however, there are some edge cases where
1018
            # it does not. accommodating these kinds of edge cases just to allow open-ended
1019
            # timerange is not high enough priority to warrant the effort. It is safer for now
1020
            # to simply ask user to add their end date
1021
            raise OperationalException("FreqAI backtesting does not allow open ended timeranges. "
×
1022
                                       "Please indicate the end date of your desired backtesting. "
1023
                                       "timerange.")
1024
            # backtest_timerange.stopts = int(
1025
            #     datetime.now(tz=timezone.utc).timestamp()
1026
            # )
1027

1028
        backtest_timerange.startts = (
1✔
1029
            backtest_timerange.startts - backtest_period_days * SECONDS_IN_DAY
1030
        )
1031
        full_timerange = backtest_timerange.timerange_str
1✔
1032
        config_path = Path(self.config["config_files"][0])
1✔
1033

1034
        if not self.full_path.is_dir():
1✔
1035
            self.full_path.mkdir(parents=True, exist_ok=True)
1✔
1036
            shutil.copy(
1✔
1037
                config_path.resolve(),
1038
                Path(self.full_path / config_path.parts[-1]),
1039
            )
1040

1041
        return full_timerange
1✔
1042

1043
    def check_if_model_expired(self, trained_timestamp: int) -> bool:
1✔
1044
        """
1045
        A model age checker to determine if the model is trustworthy based on user defined
1046
        `expiration_hours` in the configuration file.
1047
        :param trained_timestamp: int = The time of training for the most recent model.
1048
        :return:
1049
            bool = If the model is expired or not.
1050
        """
1051
        time = datetime.now(tz=timezone.utc).timestamp()
1✔
1052
        elapsed_time = (time - trained_timestamp) / 3600  # hours
1✔
1053
        max_time = self.freqai_config.get("expiration_hours", 0)
1✔
1054
        if max_time > 0:
1✔
1055
            return elapsed_time > max_time
1✔
1056
        else:
1057
            return False
×
1058

1059
    def check_if_new_training_required(
1✔
1060
        self, trained_timestamp: int
1061
    ) -> Tuple[bool, TimeRange, TimeRange]:
1062

1063
        time = datetime.now(tz=timezone.utc).timestamp()
×
1064
        trained_timerange = TimeRange()
×
1065
        data_load_timerange = TimeRange()
×
1066

1067
        timeframes = self.freqai_config["feature_parameters"].get("include_timeframes")
×
1068

1069
        max_tf_seconds = 0
×
1070
        for tf in timeframes:
×
1071
            secs = timeframe_to_seconds(tf)
×
1072
            if secs > max_tf_seconds:
×
1073
                max_tf_seconds = secs
×
1074

1075
        # We notice that users like to use exotic indicators where
1076
        # they do not know the required timeperiod. Here we include a factor
1077
        # of safety by multiplying the user considered "max" by 2.
1078
        max_period = self.config.get('startup_candle_count', 20) * 2
×
1079
        additional_seconds = max_period * max_tf_seconds
×
1080

1081
        if trained_timestamp != 0:
×
1082
            elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR
×
1083
            retrain = elapsed_time > self.freqai_config.get("live_retrain_hours", 0)
×
1084
            if retrain:
×
1085
                trained_timerange.startts = int(
×
1086
                    time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1087
                )
1088
                trained_timerange.stopts = int(time)
×
1089
                # we want to load/populate indicators on more data than we plan to train on so
1090
                # because most of the indicators have a rolling timeperiod, and are thus NaNs
1091
                # unless they have data further back in time before the start of the train period
1092
                data_load_timerange.startts = int(
×
1093
                    time
1094
                    - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1095
                    - additional_seconds
1096
                )
1097
                data_load_timerange.stopts = int(time)
×
1098
        else:  # user passed no live_trained_timerange in config
1099
            trained_timerange.startts = int(
×
1100
                time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1101
            )
1102
            trained_timerange.stopts = int(time)
×
1103

1104
            data_load_timerange.startts = int(
×
1105
                time
1106
                - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
1107
                - additional_seconds
1108
            )
1109
            data_load_timerange.stopts = int(time)
×
1110
            retrain = True
×
1111

1112
        return retrain, trained_timerange, data_load_timerange
×
1113

1114
    def set_new_model_names(self, pair: str, timestamp_id: int):
1✔
1115

1116
        coin, _ = pair.split("/")
1✔
1117
        self.data_path = Path(
1✔
1118
            self.full_path
1119
            / f"sub-train-{pair.split('/')[0]}_{timestamp_id}"
1120
        )
1121

1122
        self.model_filename = f"cb_{coin.lower()}_{timestamp_id}"
1✔
1123

1124
    def set_all_pairs(self) -> None:
1✔
1125

1126
        self.all_pairs = copy.deepcopy(
1✔
1127
            self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
1128
        )
1129
        for pair in self.config.get("exchange", "").get("pair_whitelist"):
1✔
1130
            if pair not in self.all_pairs:
1✔
1131
                self.all_pairs.append(pair)
1✔
1132

1133
    def extract_corr_pair_columns_from_populated_indicators(
1✔
1134
        self,
1135
        dataframe: DataFrame
1136
    ) -> Dict[str, DataFrame]:
1137
        """
1138
        Find the columns of the dataframe corresponding to the corr_pairlist, save them
1139
        in a dictionary to be reused and attached to other pairs.
1140

1141
        :param dataframe: fully populated dataframe (current pair + corr_pairs)
1142
        :return: corr_dataframes, dictionary of dataframes to be attached
1143
                 to other pairs in same candle.
1144
        """
1145
        corr_dataframes: Dict[str, DataFrame] = {}
1✔
1146
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
1✔
1147

1148
        for pair in pairs:
1✔
1149
            pair = pair.replace(':', '')  # lightgbm doesnt like colons
1✔
1150
            pair_cols = [col for col in dataframe.columns if col.startswith("%")
1✔
1151
                         and f"{pair}_" in col]
1152

1153
            if pair_cols:
1✔
1154
                pair_cols.insert(0, 'date')
1✔
1155
                corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
1✔
1156

1157
        return corr_dataframes
1✔
1158

1159
    def attach_corr_pair_columns(self, dataframe: DataFrame,
1✔
1160
                                 corr_dataframes: Dict[str, DataFrame],
1161
                                 current_pair: str) -> DataFrame:
1162
        """
1163
        Attach the existing corr_pair dataframes to the current pair dataframe before training
1164

1165
        :param dataframe: current pair strategy dataframe, indicators populated already
1166
        :param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
1167
        :param current_pair: current pair to which we will attach corr pair dataframe
1168
        :return:
1169
        :dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
1170
                    ready for training
1171
        """
1172
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
1173
        current_pair = current_pair.replace(':', '')
×
1174
        for pair in pairs:
×
1175
            pair = pair.replace(':', '')  # lightgbm doesnt work with colons
×
1176
            if current_pair != pair:
×
1177
                dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
×
1178

1179
        return dataframe
×
1180

1181
    def get_pair_data_for_features(self,
1✔
1182
                                   pair: str,
1183
                                   tf: str,
1184
                                   strategy: IStrategy,
1185
                                   corr_dataframes: dict = {},
1186
                                   base_dataframes: dict = {},
1187
                                   is_corr_pairs: bool = False) -> DataFrame:
1188
        """
1189
        Get the data for the pair. If it's not in the dictionary, get it from the data provider
1190
        :param pair: str = pair to get data for
1191
        :param tf: str = timeframe to get data for
1192
        :param strategy: IStrategy = user defined strategy object
1193
        :param corr_dataframes: dict = dict containing the df pair dataframes
1194
                                (for user defined timeframes)
1195
        :param base_dataframes: dict = dict containing the current pair dataframes
1196
                                (for user defined timeframes)
1197
        :param is_corr_pairs: bool = whether the pair is a corr pair or not
1198
        :return: dataframe = dataframe containing the pair data
1199
        """
1200
        if is_corr_pairs:
1✔
1201
            dataframe = corr_dataframes[pair][tf]
1✔
1202
            if not dataframe.empty:
1✔
1203
                return dataframe
1✔
1204
            else:
1205
                dataframe = strategy.dp.get_pair_dataframe(pair=pair, timeframe=tf)
1✔
1206
                return dataframe
1✔
1207
        else:
1208
            dataframe = base_dataframes[tf]
1✔
1209
            if not dataframe.empty:
1✔
1210
                return dataframe
1✔
1211
            else:
1212
                dataframe = strategy.dp.get_pair_dataframe(pair=pair, timeframe=tf)
1✔
1213
                return dataframe
1✔
1214

1215
    def merge_features(self, df_main: DataFrame, df_to_merge: DataFrame,
1✔
1216
                       tf: str, timeframe_inf: str, suffix: str) -> DataFrame:
1217
        """
1218
        Merge the features of the dataframe and remove HLCV and date added columns
1219
        :param df_main: DataFrame = main dataframe
1220
        :param df_to_merge: DataFrame = dataframe to merge
1221
        :param tf: str = timeframe of the main dataframe
1222
        :param timeframe_inf: str = timeframe of the dataframe to merge
1223
        :param suffix: str = suffix to add to the columns of the dataframe to merge
1224
        :return: dataframe = merged dataframe
1225
        """
1226
        dataframe = merge_informative_pair(df_main, df_to_merge, tf, timeframe_inf=timeframe_inf,
1✔
1227
                                           append_timeframe=False, suffix=suffix, ffill=True)
1228
        skip_columns = [
1✔
1229
            (f"{s}_{suffix}") for s in ["date", "open", "high", "low", "close", "volume"]
1230
        ]
1231
        dataframe = dataframe.drop(columns=skip_columns)
1✔
1232
        return dataframe
1✔
1233

1234
    def populate_features(self, dataframe: DataFrame, pair: str, strategy: IStrategy,
1✔
1235
                          corr_dataframes: dict, base_dataframes: dict,
1236
                          is_corr_pairs: bool = False) -> DataFrame:
1237
        """
1238
        Use the user defined strategy functions for populating features
1239
        :param dataframe: DataFrame = dataframe to populate
1240
        :param pair: str = pair to populate
1241
        :param strategy: IStrategy = user defined strategy object
1242
        :param corr_dataframes: dict = dict containing the df pair dataframes
1243
        :param base_dataframes: dict = dict containing the current pair dataframes
1244
        :param is_corr_pairs: bool = whether the pair is a corr pair or not
1245
        :return: dataframe = populated dataframe
1246
        """
1247
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
1248

1249
        for tf in tfs:
1✔
1250
            informative_df = self.get_pair_data_for_features(
1✔
1251
                pair, tf, strategy, corr_dataframes, base_dataframes, is_corr_pairs)
1252
            informative_copy = informative_df.copy()
1✔
1253

1254
            for t in self.freqai_config["feature_parameters"]["indicator_periods_candles"]:
1✔
1255
                df_features = strategy.feature_engineering_expand_all(
1✔
1256
                    informative_copy.copy(), t)
1257
                suffix = f"{t}"
1✔
1258
                informative_df = self.merge_features(informative_df, df_features, tf, tf, suffix)
1✔
1259

1260
            generic_df = strategy.feature_engineering_expand_basic(informative_copy.copy())
1✔
1261
            suffix = "gen"
1✔
1262

1263
            informative_df = self.merge_features(informative_df, generic_df, tf, tf, suffix)
1✔
1264

1265
            indicators = [col for col in informative_df if col.startswith("%")]
1✔
1266
            for n in range(self.freqai_config["feature_parameters"]["include_shifted_candles"] + 1):
1✔
1267
                if n == 0:
1✔
1268
                    continue
1✔
1269
                df_shift = informative_df[indicators].shift(n)
1✔
1270
                df_shift = df_shift.add_suffix("_shift-" + str(n))
1✔
1271
                informative_df = pd.concat((informative_df, df_shift), axis=1)
1✔
1272

1273
            dataframe = self.merge_features(dataframe.copy(), informative_df,
1✔
1274
                                            self.config["timeframe"], tf, f'{pair}_{tf}')
1275

1276
        return dataframe
1✔
1277

1278
    def use_strategy_to_populate_indicators(
1✔
1279
        self,
1280
        strategy: IStrategy,
1281
        corr_dataframes: dict = {},
1282
        base_dataframes: dict = {},
1283
        pair: str = "",
1284
        prediction_dataframe: DataFrame = pd.DataFrame(),
1285
        do_corr_pairs: bool = True,
1286
    ) -> DataFrame:
1287
        """
1288
        Use the user defined strategy for populating indicators during retrain
1289
        :param strategy: IStrategy = user defined strategy object
1290
        :param corr_dataframes: dict = dict containing the df pair dataframes
1291
                                (for user defined timeframes)
1292
        :param base_dataframes: dict = dict containing the current pair dataframes
1293
                                (for user defined timeframes)
1294
        :param pair: str = pair to populate
1295
        :param prediction_dataframe: DataFrame = dataframe containing the pair data
1296
        used for prediction
1297
        :param do_corr_pairs: bool = whether to populate corr pairs or not
1298
        :return:
1299
        dataframe: DataFrame = dataframe containing populated indicators
1300
        """
1301

1302
        # this is a hack to check if the user is using the populate_any_indicators function
1303
        new_version = inspect.getsource(strategy.populate_any_indicators) == (
1✔
1304
            inspect.getsource(IStrategy.populate_any_indicators))
1305

1306
        if new_version:
1✔
1307
            tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
1308
            pairs: List[str] = self.freqai_config["feature_parameters"].get(
1✔
1309
                "include_corr_pairlist", [])
1310

1311
            for tf in tfs:
1✔
1312
                if tf not in base_dataframes:
1✔
1313
                    base_dataframes[tf] = pd.DataFrame()
1✔
1314
                for p in pairs:
1✔
1315
                    if p not in corr_dataframes:
1✔
1316
                        corr_dataframes[p] = {}
1✔
1317
                    if tf not in corr_dataframes[p]:
1✔
1318
                        corr_dataframes[p][tf] = pd.DataFrame()
1✔
1319

1320
            if not prediction_dataframe.empty:
1✔
1321
                dataframe = prediction_dataframe.copy()
1✔
1322
            else:
1323
                dataframe = base_dataframes[self.config["timeframe"]].copy()
1✔
1324

1325
            corr_pairs: List[str] = self.freqai_config["feature_parameters"].get(
1✔
1326
                "include_corr_pairlist", [])
1327
            dataframe = self.populate_features(dataframe.copy(), pair, strategy,
1✔
1328
                                               corr_dataframes, base_dataframes)
1329

1330
            dataframe = strategy.feature_engineering_standard(dataframe.copy())
1✔
1331
            # ensure corr pairs are always last
1332
            for corr_pair in corr_pairs:
1✔
1333
                if pair == corr_pair:
1✔
1334
                    continue  # dont repeat anything from whitelist
1✔
1335
                if corr_pairs and do_corr_pairs:
1✔
1336
                    dataframe = self.populate_features(dataframe.copy(), corr_pair, strategy,
1✔
1337
                                                       corr_dataframes, base_dataframes, True)
1338

1339
            dataframe = strategy.set_freqai_targets(dataframe.copy())
1✔
1340

1341
            self.get_unique_classes_from_labels(dataframe)
1✔
1342

1343
            dataframe = self.remove_special_chars_from_feature_names(dataframe)
1✔
1344

1345
            if self.config.get('reduce_df_footprint', False):
1✔
1346
                dataframe = reduce_dataframe_footprint(dataframe)
1✔
1347

1348
            return dataframe
1✔
1349

1350
        else:
1351
            # the user is using the populate_any_indicators functions which is deprecated
1352

1353
            df = self.use_strategy_to_populate_indicators_old_version(
×
1354
                strategy, corr_dataframes, base_dataframes, pair,
1355
                prediction_dataframe, do_corr_pairs)
1356
            return df
×
1357

1358
    def use_strategy_to_populate_indicators_old_version(
1✔
1359
        self,
1360
        strategy: IStrategy,
1361
        corr_dataframes: dict = {},
1362
        base_dataframes: dict = {},
1363
        pair: str = "",
1364
        prediction_dataframe: DataFrame = pd.DataFrame(),
1365
        do_corr_pairs: bool = True,
1366
    ) -> DataFrame:
1367
        """
1368
        Use the user defined strategy for populating indicators during retrain
1369
        :param strategy: IStrategy = user defined strategy object
1370
        :param corr_dataframes: dict = dict containing the df pair dataframes
1371
                                (for user defined timeframes)
1372
        :param base_dataframes: dict = dict containing the current pair dataframes
1373
                                (for user defined timeframes)
1374
        :param metadata: dict = strategy furnished pair metadata
1375
        :return:
1376
        dataframe: DataFrame = dataframe containing populated indicators
1377
        """
1378

1379
        # for prediction dataframe creation, we let dataprovider handle everything in the strategy
1380
        # so we create empty dictionaries, which allows us to pass None to
1381
        # `populate_any_indicators()`. Signaling we want the dp to give us the live dataframe.
1382
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
×
1383
        pairs: List[str] = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
1384
        if not prediction_dataframe.empty:
×
1385
            dataframe = prediction_dataframe.copy()
×
1386
            for tf in tfs:
×
1387
                base_dataframes[tf] = None
×
1388
                for p in pairs:
×
1389
                    if p not in corr_dataframes:
×
1390
                        corr_dataframes[p] = {}
×
1391
                    corr_dataframes[p][tf] = None
×
1392
        else:
1393
            dataframe = base_dataframes[self.config["timeframe"]].copy()
×
1394

1395
        sgi = False
×
1396
        for tf in tfs:
×
1397
            if tf == tfs[-1]:
×
1398
                sgi = True  # doing this last allows user to use all tf raw prices in labels
×
1399
            dataframe = strategy.populate_any_indicators(
×
1400
                pair,
1401
                dataframe.copy(),
1402
                tf,
1403
                informative=base_dataframes[tf],
1404
                set_generalized_indicators=sgi
1405
            )
1406

1407
        # ensure corr pairs are always last
1408
        for corr_pair in pairs:
×
1409
            if pair == corr_pair:
×
1410
                continue  # dont repeat anything from whitelist
×
1411
            for tf in tfs:
×
1412
                if pairs and do_corr_pairs:
×
1413
                    dataframe = strategy.populate_any_indicators(
×
1414
                        corr_pair,
1415
                        dataframe.copy(),
1416
                        tf,
1417
                        informative=corr_dataframes[corr_pair][tf]
1418
                    )
1419

1420
        self.get_unique_classes_from_labels(dataframe)
×
1421

1422
        dataframe = self.remove_special_chars_from_feature_names(dataframe)
×
1423

1424
        if self.config.get('reduce_df_footprint', False):
×
1425
            dataframe = reduce_dataframe_footprint(dataframe)
×
1426

1427
        return dataframe
×
1428

1429
    def fit_labels(self) -> None:
1✔
1430
        """
1431
        Fit the labels with a gaussian distribution
1432
        """
1433
        import scipy as spy
1✔
1434

1435
        self.data["labels_mean"], self.data["labels_std"] = {}, {}
1✔
1436
        for label in self.data_dictionary["train_labels"].columns:
1✔
1437
            if self.data_dictionary["train_labels"][label].dtype == object:
1✔
1438
                continue
1✔
1439
            f = spy.stats.norm.fit(self.data_dictionary["train_labels"][label])
1✔
1440
            self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
1✔
1441

1442
        # incase targets are classifications
1443
        for label in self.unique_class_list:
1✔
1444
            self.data["labels_mean"][label], self.data["labels_std"][label] = 0, 0
1✔
1445

1446
        return
1✔
1447

1448
    def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
1✔
1449
        """
1450
        Remove the features from the dataframe before returning it to strategy. This keeps it
1451
        compact for Frequi purposes.
1452
        """
1453
        to_keep = [
×
1454
            col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
1455
        ]
1456
        return dataframe[to_keep]
×
1457

1458
    def get_unique_classes_from_labels(self, dataframe: DataFrame) -> None:
1✔
1459

1460
        # self.find_features(dataframe)
1461
        self.find_labels(dataframe)
1✔
1462

1463
        for key in self.label_list:
1✔
1464
            if dataframe[key].dtype == object:
1✔
1465
                self.unique_classes[key] = dataframe[key].dropna().unique()
1✔
1466

1467
        if self.unique_classes:
1✔
1468
            for label in self.unique_classes:
1✔
1469
                self.unique_class_list += list(self.unique_classes[label])
1✔
1470

1471
    def save_backtesting_prediction(
1✔
1472
        self, append_df: DataFrame
1473
    ) -> None:
1474
        """
1475
        Save prediction dataframe from backtesting to feather file format
1476
        :param append_df: dataframe for backtesting period
1477
        """
1478
        full_predictions_folder = Path(self.full_path / self.backtest_predictions_folder)
1✔
1479
        if not full_predictions_folder.is_dir():
1✔
1480
            full_predictions_folder.mkdir(parents=True, exist_ok=True)
1✔
1481

1482
        append_df.to_feather(self.backtesting_results_path)
1✔
1483

1484
    def get_backtesting_prediction(
1✔
1485
        self
1486
    ) -> DataFrame:
1487
        """
1488
        Get prediction dataframe from feather file format
1489
        """
1490
        append_df = pd.read_feather(self.backtesting_results_path)
1✔
1491
        return append_df
1✔
1492

1493
    def check_if_backtest_prediction_is_valid(
1✔
1494
        self,
1495
        len_backtest_df: int
1496
    ) -> bool:
1497
        """
1498
        Check if a backtesting prediction already exists and if the predictions
1499
        to append have the same size as the backtesting dataframe slice
1500
        :param length_backtesting_dataframe: Length of backtesting dataframe slice
1501
        :return:
1502
        :boolean: whether the prediction file is valid.
1503
        """
1504
        path_to_predictionfile = Path(self.full_path /
1✔
1505
                                      self.backtest_predictions_folder /
1506
                                      f"{self.model_filename}_prediction.feather")
1507
        self.backtesting_results_path = path_to_predictionfile
1✔
1508

1509
        file_exists = path_to_predictionfile.is_file()
1✔
1510

1511
        if file_exists:
1✔
1512
            append_df = self.get_backtesting_prediction()
1✔
1513
            if len(append_df) == len_backtest_df and 'date' in append_df:
1✔
1514
                logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
1✔
1515
                return True
1✔
1516
            else:
1517
                logger.info("A new backtesting prediction file is required. "
×
1518
                            "(Number of predictions is different from dataframe length or "
1519
                            "old prediction file version).")
1520
                return False
×
1521
        else:
1522
            logger.info(
1✔
1523
                f"Could not find backtesting prediction file at {path_to_predictionfile}"
1524
            )
1525
            return False
1✔
1526

1527
    def get_full_models_path(self, config: Config) -> Path:
1✔
1528
        """
1529
        Returns default FreqAI model path
1530
        :param config: Configuration dictionary
1531
        """
1532
        freqai_config: Dict[str, Any] = config["freqai"]
1✔
1533
        return Path(
1✔
1534
            config["user_data_dir"] / "models" / str(freqai_config.get("identifier"))
1535
        )
1536

1537
    def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
1✔
1538
        """
1539
        Remove all special characters from feature strings (:)
1540
        :param dataframe: the dataframe that just finished indicator population. (unfiltered)
1541
        :return: dataframe with cleaned featrue names
1542
        """
1543

1544
        spec_chars = [':']
1✔
1545
        for c in spec_chars:
1✔
1546
            dataframe.columns = dataframe.columns.str.replace(c, "")
1✔
1547

1548
        return dataframe
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc