• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.77
/freqtrade/freqai/data_kitchen.py
1
import copy
1✔
2
import inspect
1✔
3
import logging
1✔
4
import random
1✔
5
import shutil
1✔
6
from datetime import datetime, timezone
1✔
7
from pathlib import Path
1✔
8
from typing import Any, Dict, List, Optional, Tuple
1✔
9

10
import numpy as np
1✔
11
import numpy.typing as npt
1✔
12
import pandas as pd
1✔
13
import psutil
1✔
14
from datasieve.pipeline import Pipeline
1✔
15
from pandas import DataFrame
1✔
16
from sklearn.model_selection import train_test_split
1✔
17

18
from freqtrade.configuration import TimeRange
1✔
19
from freqtrade.constants import DOCS_LINK, Config
1✔
20
from freqtrade.data.converter import reduce_dataframe_footprint
1✔
21
from freqtrade.exceptions import OperationalException
1✔
22
from freqtrade.exchange import timeframe_to_seconds
1✔
23
from freqtrade.strategy import merge_informative_pair
1✔
24
from freqtrade.strategy.interface import IStrategy
1✔
25

26

27
pd.set_option('future.no_silent_downcasting', True)
1✔
28

29
SECONDS_IN_DAY = 86400
1✔
30
SECONDS_IN_HOUR = 3600
1✔
31

32
logger = logging.getLogger(__name__)
1✔
33

34

35
class FreqaiDataKitchen:
1✔
36
    """
37
    Class designed to analyze data for a single pair. Employed by the IFreqaiModel class.
38
    Functionalities include holding, saving, loading, and analyzing the data.
39

40
    This object is not persistent, it is reinstantiated for each coin, each time the coin
41
    model needs to be inferenced or trained.
42

43
    Record of contribution:
44
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
45
    project.
46

47
    Conception and software development:
48
    Robert Caulk @robcaulk
49

50
    Theoretical brainstorming:
51
    Elin Törnquist @th0rntwig
52

53
    Code review, software architecture brainstorming:
54
    @xmatthias
55

56
    Beta testing and bug reporting:
57
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
58
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
59
    """
60

61
    def __init__(
1✔
62
        self,
63
        config: Config,
64
        live: bool = False,
65
        pair: str = "",
66
    ):
67
        self.data: Dict[str, Any] = {}
1✔
68
        self.data_dictionary: Dict[str, DataFrame] = {}
1✔
69
        self.config = config
1✔
70
        self.freqai_config: Dict[str, Any] = config["freqai"]
1✔
71
        self.full_df: DataFrame = DataFrame()
1✔
72
        self.append_df: DataFrame = DataFrame()
1✔
73
        self.data_path = Path()
1✔
74
        self.label_list: List = []
1✔
75
        self.training_features_list: List = []
1✔
76
        self.model_filename: str = ""
1✔
77
        self.backtesting_results_path = Path()
1✔
78
        self.backtest_predictions_folder: str = "backtesting_predictions"
1✔
79
        self.live = live
1✔
80
        self.pair = pair
1✔
81
        self.keras: bool = self.freqai_config.get("keras", False)
1✔
82
        self.set_all_pairs()
1✔
83
        self.backtest_live_models = config.get("freqai_backtest_live_models", False)
1✔
84
        self.feature_pipeline = Pipeline()
1✔
85
        self.label_pipeline = Pipeline()
1✔
86
        self.DI_values: npt.NDArray = np.array([])
1✔
87

88
        if not self.live:
1✔
89
            self.full_path = self.get_full_models_path(self.config)
1✔
90

91
            if not self.backtest_live_models:
1✔
92
                self.full_timerange = self.create_fulltimerange(
1✔
93
                    self.config["timerange"], self.freqai_config.get("train_period_days", 0)
94
                )
95
                (self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
1✔
96
                    self.full_timerange,
97
                    config["freqai"]["train_period_days"],
98
                    config["freqai"]["backtest_period_days"],
99
                )
100

101
        self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
1✔
102
        if not self.freqai_config.get("data_kitchen_thread_count", 0):
1✔
103
            self.thread_count = max(int(psutil.cpu_count() * 2 - 2), 1)
×
104
        else:
105
            self.thread_count = self.freqai_config["data_kitchen_thread_count"]
1✔
106
        self.train_dates: DataFrame = pd.DataFrame()
1✔
107
        self.unique_classes: Dict[str, list] = {}
1✔
108
        self.unique_class_list: list = []
1✔
109
        self.backtest_live_models_data: Dict[str, Any] = {}
1✔
110

111
    def set_paths(
1✔
112
        self,
113
        pair: str,
114
        trained_timestamp: Optional[int] = None,
115
    ) -> None:
116
        """
117
        Set the paths to the data for the present coin/botloop
118
        :param metadata: dict = strategy furnished pair metadata
119
        :param trained_timestamp: int = timestamp of most recent training
120
        """
121
        self.full_path = self.get_full_models_path(self.config)
1✔
122
        self.data_path = Path(
1✔
123
            self.full_path
124
            / f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
125
        )
126

127
        return
1✔
128

129
    def make_train_test_datasets(
1✔
130
        self, filtered_dataframe: DataFrame, labels: DataFrame
131
    ) -> Dict[Any, Any]:
132
        """
133
        Given the dataframe for the full history for training, split the data into
134
        training and test data according to user specified parameters in configuration
135
        file.
136
        :param filtered_dataframe: cleaned dataframe ready to be split.
137
        :param labels: cleaned labels ready to be split.
138
        """
139
        feat_dict = self.freqai_config["feature_parameters"]
1✔
140

141
        if 'shuffle' not in self.freqai_config['data_split_parameters']:
1✔
142
            self.freqai_config["data_split_parameters"].update({'shuffle': False})
×
143

144
        weights: npt.ArrayLike
145
        if feat_dict.get("weight_factor", 0) > 0:
1✔
146
            weights = self.set_weights_higher_recent(len(filtered_dataframe))
1✔
147
        else:
148
            weights = np.ones(len(filtered_dataframe))
×
149

150
        if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
151
            (
1✔
152
                train_features,
153
                test_features,
154
                train_labels,
155
                test_labels,
156
                train_weights,
157
                test_weights,
158
            ) = train_test_split(
159
                filtered_dataframe[: filtered_dataframe.shape[0]],
160
                labels,
161
                weights,
162
                **self.config["freqai"]["data_split_parameters"],
163
            )
164
        else:
165
            test_labels = np.zeros(2)
×
166
            test_features = pd.DataFrame()
×
167
            test_weights = np.zeros(2)
×
168
            train_features = filtered_dataframe
×
169
            train_labels = labels
×
170
            train_weights = weights
×
171

172
        if feat_dict["shuffle_after_split"]:
1✔
173
            rint1 = random.randint(0, 100)
1✔
174
            rint2 = random.randint(0, 100)
1✔
175
            train_features = train_features.sample(
1✔
176
                frac=1, random_state=rint1).reset_index(drop=True)
177
            train_labels = train_labels.sample(frac=1, random_state=rint1).reset_index(drop=True)
1✔
178
            train_weights = pd.DataFrame(train_weights).sample(
1✔
179
                frac=1, random_state=rint1).reset_index(drop=True).to_numpy()[:, 0]
180
            test_features = test_features.sample(frac=1, random_state=rint2).reset_index(drop=True)
1✔
181
            test_labels = test_labels.sample(frac=1, random_state=rint2).reset_index(drop=True)
1✔
182
            test_weights = pd.DataFrame(test_weights).sample(
1✔
183
                frac=1, random_state=rint2).reset_index(drop=True).to_numpy()[:, 0]
184

185
        # Simplest way to reverse the order of training and test data:
186
        if self.freqai_config['feature_parameters'].get('reverse_train_test_order', False):
1✔
187
            return self.build_data_dictionary(
×
188
                test_features, train_features, test_labels,
189
                train_labels, test_weights, train_weights
190
                )
191
        else:
192
            return self.build_data_dictionary(
1✔
193
                train_features, test_features, train_labels,
194
                test_labels, train_weights, test_weights
195
            )
196

197
    def filter_features(
1✔
198
        self,
199
        unfiltered_df: DataFrame,
200
        training_feature_list: List,
201
        label_list: List = list(),
202
        training_filter: bool = True,
203
    ) -> Tuple[DataFrame, DataFrame]:
204
        """
205
        Filter the unfiltered dataframe to extract the user requested features/labels and properly
206
        remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
207
        0s in the prediction dataset. However, prediction dataset do_predict will reflect any
208
        row that had a NaN and will shield user from that prediction.
209

210
        :param unfiltered_df: the full dataframe for the present training period
211
        :param training_feature_list: list, the training feature list constructed by
212
                                      self.build_feature_list() according to user specified
213
                                      parameters in the configuration file.
214
        :param labels: the labels for the dataset
215
        :param training_filter: boolean which lets the function know if it is training data or
216
                                prediction data to be filtered.
217
        :returns:
218
        :filtered_df: dataframe cleaned of NaNs and only containing the user
219
        requested feature set.
220
        :labels: labels cleaned of NaNs.
221
        """
222
        filtered_df = unfiltered_df.filter(training_feature_list, axis=1)
1✔
223
        filtered_df = filtered_df.replace([np.inf, -np.inf], np.nan)
1✔
224

225
        drop_index = pd.isnull(filtered_df).any(axis=1)  # get the rows that have NaNs,
1✔
226
        drop_index = drop_index.replace(True, 1).replace(False, 0).infer_objects(copy=False)
1✔
227
        if (training_filter):
1✔
228

229
            # we don't care about total row number (total no. datapoints) in training, we only care
230
            # about removing any row with NaNs
231
            # if labels has multiple columns (user wants to train multiple modelEs), we detect here
232
            labels = unfiltered_df.filter(label_list, axis=1)
1✔
233
            drop_index_labels = pd.isnull(labels).any(axis=1)
1✔
234
            drop_index_labels = drop_index_labels.replace(
1✔
235
                True, 1
236
            ).replace(False, 0).infer_objects(copy=False)
237
            dates = unfiltered_df['date']
1✔
238
            filtered_df = filtered_df[
1✔
239
                (drop_index == 0) & (drop_index_labels == 0)
240
            ]  # dropping values
241
            labels = labels[
1✔
242
                (drop_index == 0) & (drop_index_labels == 0)
243
            ]  # assuming the labels depend entirely on the dataframe here.
244
            self.train_dates = dates[
1✔
245
                (drop_index == 0) & (drop_index_labels == 0)
246
            ]
247
            logger.info(
1✔
248
                f"{self.pair}: dropped {len(unfiltered_df) - len(filtered_df)} training points"
249
                f" due to NaNs in populated dataset {len(unfiltered_df)}."
250
            )
251
            if len(filtered_df) == 0 and not self.live:
1✔
252
                raise OperationalException(
×
253
                    f"{self.pair}: all training data dropped due to NaNs. "
254
                    "You likely did not download enough training data prior "
255
                    "to your backtest timerange. Hint:\n"
256
                    f"{DOCS_LINK}/freqai-running/"
257
                    "#downloading-data-to-cover-the-full-backtest-period"
258
                )
259
            if (1 - len(filtered_df) / len(unfiltered_df)) > 0.1 and self.live:
1✔
260
                worst_indicator = str(unfiltered_df.count().idxmin())
×
261
                logger.warning(
×
262
                    f" {(1 - len(filtered_df) / len(unfiltered_df)) * 100:.0f} percent "
263
                    " of training data dropped due to NaNs, model may perform inconsistent "
264
                    f"with expectations. Verify {worst_indicator}"
265
                )
266
            self.data["filter_drop_index_training"] = drop_index
1✔
267

268
        else:
269

270
            # we are backtesting so we need to preserve row number to send back to strategy,
271
            # so now we use do_predict to avoid any prediction based on a NaN
272
            drop_index = pd.isnull(filtered_df).any(axis=1)
1✔
273
            self.data["filter_drop_index_prediction"] = drop_index
1✔
274
            filtered_df.fillna(0, inplace=True)
1✔
275
            # replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction
276
            # that was based on a single NaN is ultimately protected from buys with do_predict
277
            drop_index = ~drop_index
1✔
278
            self.do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
1✔
279
            if (len(self.do_predict) - self.do_predict.sum()) > 0:
1✔
280
                logger.info(
×
281
                    "dropped %s of %s prediction data points due to NaNs.",
282
                    len(self.do_predict) - self.do_predict.sum(),
283
                    len(filtered_df),
284
                )
285
            labels = []
1✔
286

287
        return filtered_df, labels
1✔
288

289
    def build_data_dictionary(
1✔
290
        self,
291
        train_df: DataFrame,
292
        test_df: DataFrame,
293
        train_labels: DataFrame,
294
        test_labels: DataFrame,
295
        train_weights: Any,
296
        test_weights: Any,
297
    ) -> Dict:
298

299
        self.data_dictionary = {
1✔
300
            "train_features": train_df,
301
            "test_features": test_df,
302
            "train_labels": train_labels,
303
            "test_labels": test_labels,
304
            "train_weights": train_weights,
305
            "test_weights": test_weights,
306
            "train_dates": self.train_dates
307
        }
308

309
        return self.data_dictionary
1✔
310

311
    def split_timerange(
1✔
312
        self, tr: str, train_split: int = 28, bt_split: float = 7
313
    ) -> Tuple[list, list]:
314
        """
315
        Function which takes a single time range (tr) and splits it
316
        into sub timeranges to train and backtest on based on user input
317
        tr: str, full timerange to train on
318
        train_split: the period length for the each training (days). Specified in user
319
        configuration file
320
        bt_split: the backtesting length (days). Specified in user configuration file
321
        """
322

323
        if not isinstance(train_split, int) or train_split < 1:
1✔
324
            raise OperationalException(
1✔
325
                f"train_period_days must be an integer greater than 0. Got {train_split}."
326
            )
327
        train_period_days = train_split * SECONDS_IN_DAY
1✔
328
        bt_period = bt_split * SECONDS_IN_DAY
1✔
329

330
        full_timerange = TimeRange.parse_timerange(tr)
1✔
331
        config_timerange = TimeRange.parse_timerange(self.config["timerange"])
1✔
332
        if config_timerange.stopts == 0:
1✔
333
            config_timerange.stopts = int(
×
334
                datetime.now(tz=timezone.utc).timestamp()
335
            )
336
        timerange_train = copy.deepcopy(full_timerange)
1✔
337
        timerange_backtest = copy.deepcopy(full_timerange)
1✔
338

339
        tr_training_list = []
1✔
340
        tr_backtesting_list = []
1✔
341
        tr_training_list_timerange = []
1✔
342
        tr_backtesting_list_timerange = []
1✔
343
        first = True
1✔
344

345
        while True:
1✔
346
            if not first:
1✔
347
                timerange_train.startts = timerange_train.startts + int(bt_period)
1✔
348
            timerange_train.stopts = timerange_train.startts + train_period_days
1✔
349

350
            first = False
1✔
351
            tr_training_list.append(timerange_train.timerange_str)
1✔
352
            tr_training_list_timerange.append(copy.deepcopy(timerange_train))
1✔
353

354
            # associated backtest period
355
            timerange_backtest.startts = timerange_train.stopts
1✔
356
            timerange_backtest.stopts = timerange_backtest.startts + int(bt_period)
1✔
357

358
            if timerange_backtest.stopts > config_timerange.stopts:
1✔
359
                timerange_backtest.stopts = config_timerange.stopts
1✔
360

361
            tr_backtesting_list.append(timerange_backtest.timerange_str)
1✔
362
            tr_backtesting_list_timerange.append(copy.deepcopy(timerange_backtest))
1✔
363

364
            # ensure we are predicting on exactly same amount of data as requested by user defined
365
            #  --timerange
366
            if timerange_backtest.stopts == config_timerange.stopts:
1✔
367
                break
1✔
368

369
        # print(tr_training_list, tr_backtesting_list)
370
        return tr_training_list_timerange, tr_backtesting_list_timerange
1✔
371

372
    def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
1✔
373
        """
374
        Given a full dataframe, extract the user desired window
375
        :param tr: timerange string that we wish to extract from df
376
        :param df: Dataframe containing all candles to run the entire backtest. Here
377
                   it is sliced down to just the present training period.
378
        """
379
        if not self.live:
1✔
380
            df = df.loc[(df["date"] >= timerange.startdt) & (df["date"] < timerange.stopdt), :]
1✔
381
        else:
382
            df = df.loc[df["date"] >= timerange.startdt, :]
1✔
383

384
        return df
1✔
385

386
    def find_features(self, dataframe: DataFrame) -> None:
1✔
387
        """
388
        Find features in the strategy provided dataframe
389
        :param dataframe: DataFrame = strategy provided dataframe
390
        :return:
391
        features: list = the features to be used for training/prediction
392
        """
393
        column_names = dataframe.columns
1✔
394
        features = [c for c in column_names if "%" in c]
1✔
395

396
        if not features:
1✔
397
            raise OperationalException("Could not find any features!")
×
398

399
        self.training_features_list = features
1✔
400

401
    def find_labels(self, dataframe: DataFrame) -> None:
1✔
402
        column_names = dataframe.columns
1✔
403
        labels = [c for c in column_names if "&" in c]
1✔
404
        self.label_list = labels
1✔
405

406
    def set_weights_higher_recent(self, num_weights: int) -> npt.ArrayLike:
1✔
407
        """
408
        Set weights so that recent data is more heavily weighted during
409
        training than older data.
410
        """
411
        wfactor = self.config["freqai"]["feature_parameters"]["weight_factor"]
1✔
412
        weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
1✔
413
        return weights
1✔
414

415
    def get_predictions_to_append(self, predictions: DataFrame,
1✔
416
                                  do_predict: npt.ArrayLike,
417
                                  dataframe_backtest: DataFrame) -> DataFrame:
418
        """
419
        Get backtest prediction from current backtest period
420
        """
421

422
        append_df = DataFrame()
1✔
423
        for label in predictions.columns:
1✔
424
            append_df[label] = predictions[label]
1✔
425
            if append_df[label].dtype == object:
1✔
426
                continue
1✔
427
            if "labels_mean" in self.data:
1✔
428
                append_df[f"{label}_mean"] = self.data["labels_mean"][label]
1✔
429
            if "labels_std" in self.data:
1✔
430
                append_df[f"{label}_std"] = self.data["labels_std"][label]
1✔
431

432
        for extra_col in self.data["extra_returns_per_train"]:
1✔
433
            append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
×
434

435
        append_df["do_predict"] = do_predict
1✔
436
        if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
1✔
437
            append_df["DI_values"] = self.DI_values
1✔
438

439
        user_cols = [col for col in dataframe_backtest.columns if col.startswith("%%")]
1✔
440
        cols = ["date"]
1✔
441
        cols.extend(user_cols)
1✔
442

443
        dataframe_backtest.reset_index(drop=True, inplace=True)
1✔
444
        merged_df = pd.concat([dataframe_backtest[cols], append_df], axis=1)
1✔
445
        return merged_df
1✔
446

447
    def append_predictions(self, append_df: DataFrame) -> None:
1✔
448
        """
449
        Append backtest prediction from current backtest period to all previous periods
450
        """
451

452
        if self.full_df.empty:
1✔
453
            self.full_df = append_df
1✔
454
        else:
455
            self.full_df = pd.concat([self.full_df, append_df], axis=0, ignore_index=True)
1✔
456

457
    def fill_predictions(self, dataframe):
1✔
458
        """
459
        Back fill values to before the backtesting range so that the dataframe matches size
460
        when it goes back to the strategy. These rows are not included in the backtest.
461
        """
462
        to_keep = [col for col in dataframe.columns if
1✔
463
                   not col.startswith("&") and not col.startswith("%%")]
464
        self.return_dataframe = pd.merge(dataframe[to_keep],
1✔
465
                                         self.full_df, how='left', on='date')
466
        self.return_dataframe[self.full_df.columns] = (
1✔
467
            self.return_dataframe[self.full_df.columns].fillna(value=0))
468
        self.full_df = DataFrame()
1✔
469

470
        return
1✔
471

472
    def create_fulltimerange(self, backtest_tr: str, backtest_period_days: int) -> str:
1✔
473

474
        if not isinstance(backtest_period_days, int):
1✔
475
            raise OperationalException("backtest_period_days must be an integer")
1✔
476

477
        if backtest_period_days < 0:
1✔
478
            raise OperationalException("backtest_period_days must be positive")
1✔
479

480
        backtest_timerange = TimeRange.parse_timerange(backtest_tr)
1✔
481

482
        if backtest_timerange.stopts == 0:
1✔
483
            # typically open ended time ranges do work, however, there are some edge cases where
484
            # it does not. accommodating these kinds of edge cases just to allow open-ended
485
            # timerange is not high enough priority to warrant the effort. It is safer for now
486
            # to simply ask user to add their end date
487
            raise OperationalException("FreqAI backtesting does not allow open ended timeranges. "
×
488
                                       "Please indicate the end date of your desired backtesting. "
489
                                       "timerange.")
490
            # backtest_timerange.stopts = int(
491
            #     datetime.now(tz=timezone.utc).timestamp()
492
            # )
493

494
        backtest_timerange.startts = (
1✔
495
            backtest_timerange.startts - backtest_period_days * SECONDS_IN_DAY
496
        )
497
        full_timerange = backtest_timerange.timerange_str
1✔
498
        config_path = Path(self.config["config_files"][0])
1✔
499

500
        if not self.full_path.is_dir():
1✔
501
            self.full_path.mkdir(parents=True, exist_ok=True)
1✔
502
            shutil.copy(
1✔
503
                config_path.resolve(),
504
                Path(self.full_path / config_path.parts[-1]),
505
            )
506

507
        return full_timerange
1✔
508

509
    def check_if_model_expired(self, trained_timestamp: int) -> bool:
1✔
510
        """
511
        A model age checker to determine if the model is trustworthy based on user defined
512
        `expiration_hours` in the configuration file.
513
        :param trained_timestamp: int = The time of training for the most recent model.
514
        :return:
515
            bool = If the model is expired or not.
516
        """
517
        time = datetime.now(tz=timezone.utc).timestamp()
1✔
518
        elapsed_time = (time - trained_timestamp) / 3600  # hours
1✔
519
        max_time = self.freqai_config.get("expiration_hours", 0)
1✔
520
        if max_time > 0:
1✔
521
            return elapsed_time > max_time
1✔
522
        else:
523
            return False
×
524

525
    def check_if_new_training_required(
1✔
526
        self, trained_timestamp: int
527
    ) -> Tuple[bool, TimeRange, TimeRange]:
528

529
        time = datetime.now(tz=timezone.utc).timestamp()
×
530
        trained_timerange = TimeRange()
×
531
        data_load_timerange = TimeRange()
×
532

533
        timeframes = self.freqai_config["feature_parameters"].get("include_timeframes")
×
534

535
        max_tf_seconds = 0
×
536
        for tf in timeframes:
×
537
            secs = timeframe_to_seconds(tf)
×
538
            if secs > max_tf_seconds:
×
539
                max_tf_seconds = secs
×
540

541
        # We notice that users like to use exotic indicators where
542
        # they do not know the required timeperiod. Here we include a factor
543
        # of safety by multiplying the user considered "max" by 2.
544
        max_period = self.config.get('startup_candle_count', 20) * 2
×
545
        additional_seconds = max_period * max_tf_seconds
×
546

547
        if trained_timestamp != 0:
×
548
            elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR
×
549
            retrain = elapsed_time > self.freqai_config.get("live_retrain_hours", 0)
×
550
            if retrain:
×
551
                trained_timerange.startts = int(
×
552
                    time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
553
                )
554
                trained_timerange.stopts = int(time)
×
555
                # we want to load/populate indicators on more data than we plan to train on so
556
                # because most of the indicators have a rolling timeperiod, and are thus NaNs
557
                # unless they have data further back in time before the start of the train period
558
                data_load_timerange.startts = int(
×
559
                    time
560
                    - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
561
                    - additional_seconds
562
                )
563
                data_load_timerange.stopts = int(time)
×
564
        else:  # user passed no live_trained_timerange in config
565
            trained_timerange.startts = int(
×
566
                time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
567
            )
568
            trained_timerange.stopts = int(time)
×
569

570
            data_load_timerange.startts = int(
×
571
                time
572
                - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
573
                - additional_seconds
574
            )
575
            data_load_timerange.stopts = int(time)
×
576
            retrain = True
×
577

578
        return retrain, trained_timerange, data_load_timerange
×
579

580
    def set_new_model_names(self, pair: str, timestamp_id: int):
1✔
581

582
        coin, _ = pair.split("/")
1✔
583
        self.data_path = Path(
1✔
584
            self.full_path
585
            / f"sub-train-{pair.split('/')[0]}_{timestamp_id}"
586
        )
587

588
        self.model_filename = f"cb_{coin.lower()}_{timestamp_id}"
1✔
589

590
    def set_all_pairs(self) -> None:
1✔
591

592
        self.all_pairs = copy.deepcopy(
1✔
593
            self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
594
        )
595
        for pair in self.config.get("exchange", "").get("pair_whitelist"):
1✔
596
            if pair not in self.all_pairs:
1✔
597
                self.all_pairs.append(pair)
1✔
598

599
    def extract_corr_pair_columns_from_populated_indicators(
1✔
600
        self,
601
        dataframe: DataFrame
602
    ) -> Dict[str, DataFrame]:
603
        """
604
        Find the columns of the dataframe corresponding to the corr_pairlist, save them
605
        in a dictionary to be reused and attached to other pairs.
606

607
        :param dataframe: fully populated dataframe (current pair + corr_pairs)
608
        :return: corr_dataframes, dictionary of dataframes to be attached
609
                 to other pairs in same candle.
610
        """
611
        corr_dataframes: Dict[str, DataFrame] = {}
×
612
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
613

614
        for pair in pairs:
×
615
            pair = pair.replace(':', '')  # lightgbm does not like colons
×
616
            pair_cols = [col for col in dataframe.columns if col.startswith("%")
×
617
                         and f"{pair}_" in col]
618

619
            if pair_cols:
×
620
                pair_cols.insert(0, 'date')
×
621
                corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
×
622

623
        return corr_dataframes
×
624

625
    def attach_corr_pair_columns(self, dataframe: DataFrame,
1✔
626
                                 corr_dataframes: Dict[str, DataFrame],
627
                                 current_pair: str) -> DataFrame:
628
        """
629
        Attach the existing corr_pair dataframes to the current pair dataframe before training
630

631
        :param dataframe: current pair strategy dataframe, indicators populated already
632
        :param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
633
        :param current_pair: current pair to which we will attach corr pair dataframe
634
        :return:
635
        :dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
636
                    ready for training
637
        """
638
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
639
        current_pair = current_pair.replace(':', '')
×
640
        for pair in pairs:
×
641
            pair = pair.replace(':', '')  # lightgbm does not work with colons
×
642
            if current_pair != pair:
×
643
                dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
×
644

645
        return dataframe
×
646

647
    def get_pair_data_for_features(self,
1✔
648
                                   pair: str,
649
                                   tf: str,
650
                                   strategy: IStrategy,
651
                                   corr_dataframes: dict = {},
652
                                   base_dataframes: dict = {},
653
                                   is_corr_pairs: bool = False) -> DataFrame:
654
        """
655
        Get the data for the pair. If it's not in the dictionary, get it from the data provider
656
        :param pair: str = pair to get data for
657
        :param tf: str = timeframe to get data for
658
        :param strategy: IStrategy = user defined strategy object
659
        :param corr_dataframes: dict = dict containing the df pair dataframes
660
                                (for user defined timeframes)
661
        :param base_dataframes: dict = dict containing the current pair dataframes
662
                                (for user defined timeframes)
663
        :param is_corr_pairs: bool = whether the pair is a corr pair or not
664
        :return: dataframe = dataframe containing the pair data
665
        """
666
        if is_corr_pairs:
1✔
667
            dataframe = corr_dataframes[pair][tf]
1✔
668
            if not dataframe.empty:
1✔
669
                return dataframe
1✔
670
            else:
671
                dataframe = strategy.dp.get_pair_dataframe(pair=pair, timeframe=tf)
1✔
672
                return dataframe
1✔
673
        else:
674
            dataframe = base_dataframes[tf]
1✔
675
            if not dataframe.empty:
1✔
676
                return dataframe
1✔
677
            else:
678
                dataframe = strategy.dp.get_pair_dataframe(pair=pair, timeframe=tf)
1✔
679
                return dataframe
1✔
680

681
    def merge_features(self, df_main: DataFrame, df_to_merge: DataFrame,
1✔
682
                       tf: str, timeframe_inf: str, suffix: str) -> DataFrame:
683
        """
684
        Merge the features of the dataframe and remove HLCV and date added columns
685
        :param df_main: DataFrame = main dataframe
686
        :param df_to_merge: DataFrame = dataframe to merge
687
        :param tf: str = timeframe of the main dataframe
688
        :param timeframe_inf: str = timeframe of the dataframe to merge
689
        :param suffix: str = suffix to add to the columns of the dataframe to merge
690
        :return: dataframe = merged dataframe
691
        """
692
        dataframe = merge_informative_pair(df_main, df_to_merge, tf, timeframe_inf=timeframe_inf,
1✔
693
                                           append_timeframe=False, suffix=suffix, ffill=True)
694
        skip_columns = [
1✔
695
            (f"{s}_{suffix}") for s in ["date", "open", "high", "low", "close", "volume"]
696
        ]
697
        dataframe = dataframe.drop(columns=skip_columns)
1✔
698
        return dataframe
1✔
699

700
    def populate_features(self, dataframe: DataFrame, pair: str, strategy: IStrategy,
1✔
701
                          corr_dataframes: dict, base_dataframes: dict,
702
                          is_corr_pairs: bool = False) -> DataFrame:
703
        """
704
        Use the user defined strategy functions for populating features
705
        :param dataframe: DataFrame = dataframe to populate
706
        :param pair: str = pair to populate
707
        :param strategy: IStrategy = user defined strategy object
708
        :param corr_dataframes: dict = dict containing the df pair dataframes
709
        :param base_dataframes: dict = dict containing the current pair dataframes
710
        :param is_corr_pairs: bool = whether the pair is a corr pair or not
711
        :return: dataframe = populated dataframe
712
        """
713
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
714

715
        for tf in tfs:
1✔
716
            metadata = {"pair": pair, "tf": tf}
1✔
717
            informative_df = self.get_pair_data_for_features(
1✔
718
                pair, tf, strategy, corr_dataframes, base_dataframes, is_corr_pairs)
719
            informative_copy = informative_df.copy()
1✔
720

721
            logger.debug(f"Populating features for {pair} {tf}")
1✔
722

723
            for t in self.freqai_config["feature_parameters"]["indicator_periods_candles"]:
1✔
724
                df_features = strategy.feature_engineering_expand_all(
1✔
725
                    informative_copy.copy(), t, metadata=metadata)
726
                suffix = f"{t}"
1✔
727
                informative_df = self.merge_features(informative_df, df_features, tf, tf, suffix)
1✔
728

729
            generic_df = strategy.feature_engineering_expand_basic(
1✔
730
                informative_copy.copy(), metadata=metadata)
731
            suffix = "gen"
1✔
732

733
            informative_df = self.merge_features(informative_df, generic_df, tf, tf, suffix)
1✔
734

735
            indicators = [col for col in informative_df if col.startswith("%")]
1✔
736
            for n in range(self.freqai_config["feature_parameters"]["include_shifted_candles"] + 1):
1✔
737
                if n == 0:
1✔
738
                    continue
1✔
739
                df_shift = informative_df[indicators].shift(n)
1✔
740
                df_shift = df_shift.add_suffix("_shift-" + str(n))
1✔
741
                informative_df = pd.concat((informative_df, df_shift), axis=1)
1✔
742

743
            dataframe = self.merge_features(dataframe.copy(), informative_df,
1✔
744
                                            self.config["timeframe"], tf, f'{pair}_{tf}')
745

746
        return dataframe
1✔
747

748
    def use_strategy_to_populate_indicators(  # noqa: C901
1✔
749
        self,
750
        strategy: IStrategy,
751
        corr_dataframes: dict = {},
752
        base_dataframes: dict = {},
753
        pair: str = "",
754
        prediction_dataframe: DataFrame = pd.DataFrame(),
755
        do_corr_pairs: bool = True,
756
    ) -> DataFrame:
757
        """
758
        Use the user defined strategy for populating indicators during retrain
759
        :param strategy: IStrategy = user defined strategy object
760
        :param corr_dataframes: dict = dict containing the df pair dataframes
761
                                (for user defined timeframes)
762
        :param base_dataframes: dict = dict containing the current pair dataframes
763
                                (for user defined timeframes)
764
        :param pair: str = pair to populate
765
        :param prediction_dataframe: DataFrame = dataframe containing the pair data
766
        used for prediction
767
        :param do_corr_pairs: bool = whether to populate corr pairs or not
768
        :return:
769
        dataframe: DataFrame = dataframe containing populated indicators
770
        """
771

772
        # check if the user is using the deprecated populate_any_indicators function
773
        new_version = inspect.getsource(strategy.populate_any_indicators) == (
1✔
774
            inspect.getsource(IStrategy.populate_any_indicators))
775

776
        if not new_version:
1✔
777
            raise OperationalException(
×
778
                "You are using the `populate_any_indicators()` function"
779
                " which was deprecated on March 1, 2023. Please refer "
780
                "to the strategy migration guide to use the new "
781
                "feature_engineering_* methods: \n"
782
                f"{DOCS_LINK}/strategy_migration/#freqai-strategy \n"
783
                "And the feature_engineering_* documentation: \n"
784
                f"{DOCS_LINK}/freqai-feature-engineering/"
785
                )
786

787
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
788
        pairs: List[str] = self.freqai_config["feature_parameters"].get(
1✔
789
            "include_corr_pairlist", [])
790

791
        for tf in tfs:
1✔
792
            if tf not in base_dataframes:
1✔
793
                base_dataframes[tf] = pd.DataFrame()
1✔
794
            for p in pairs:
1✔
795
                if p not in corr_dataframes:
1✔
796
                    corr_dataframes[p] = {}
1✔
797
                if tf not in corr_dataframes[p]:
1✔
798
                    corr_dataframes[p][tf] = pd.DataFrame()
1✔
799

800
        if not prediction_dataframe.empty:
1✔
801
            dataframe = prediction_dataframe.copy()
1✔
802
            base_dataframes[self.config["timeframe"]] = dataframe.copy()
1✔
803
        else:
804
            dataframe = base_dataframes[self.config["timeframe"]].copy()
1✔
805

806
        corr_pairs: List[str] = self.freqai_config["feature_parameters"].get(
1✔
807
            "include_corr_pairlist", [])
808
        dataframe = self.populate_features(dataframe.copy(), pair, strategy,
1✔
809
                                           corr_dataframes, base_dataframes)
810
        metadata = {"pair": pair}
1✔
811
        dataframe = strategy.feature_engineering_standard(dataframe.copy(), metadata=metadata)
1✔
812
        # ensure corr pairs are always last
813
        for corr_pair in corr_pairs:
1✔
814
            if pair == corr_pair:
1✔
815
                continue  # dont repeat anything from whitelist
1✔
816
            if corr_pairs and do_corr_pairs:
1✔
817
                dataframe = self.populate_features(dataframe.copy(), corr_pair, strategy,
1✔
818
                                                   corr_dataframes, base_dataframes, True)
819

820
        if self.live:
1✔
821
            dataframe = strategy.set_freqai_targets(dataframe.copy(), metadata=metadata)
1✔
822
            dataframe = self.remove_special_chars_from_feature_names(dataframe)
1✔
823

824
        self.get_unique_classes_from_labels(dataframe)
1✔
825

826
        if self.config.get('reduce_df_footprint', False):
1✔
827
            dataframe = reduce_dataframe_footprint(dataframe)
1✔
828

829
        return dataframe
1✔
830

831
    def fit_labels(self) -> None:
1✔
832
        """
833
        Fit the labels with a gaussian distribution
834
        """
835
        import scipy as spy
1✔
836

837
        self.data["labels_mean"], self.data["labels_std"] = {}, {}
1✔
838
        for label in self.data_dictionary["train_labels"].columns:
1✔
839
            if self.data_dictionary["train_labels"][label].dtype == object:
1✔
840
                continue
1✔
841
            f = spy.stats.norm.fit(self.data_dictionary["train_labels"][label])
1✔
842
            self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
1✔
843

844
        # in case targets are classifications
845
        for label in self.unique_class_list:
1✔
846
            self.data["labels_mean"][label], self.data["labels_std"][label] = 0, 0
1✔
847

848
        return
1✔
849

850
    def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
1✔
851
        """
852
        Remove the features from the dataframe before returning it to strategy. This keeps it
853
        compact for Frequi purposes.
854
        """
855
        to_keep = [
1✔
856
            col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
857
        ]
858
        return dataframe[to_keep]
1✔
859

860
    def get_unique_classes_from_labels(self, dataframe: DataFrame) -> None:
1✔
861

862
        # self.find_features(dataframe)
863
        self.find_labels(dataframe)
1✔
864

865
        for key in self.label_list:
1✔
866
            if dataframe[key].dtype == object:
1✔
867
                self.unique_classes[key] = dataframe[key].dropna().unique()
1✔
868

869
        if self.unique_classes:
1✔
870
            for label in self.unique_classes:
1✔
871
                self.unique_class_list += list(self.unique_classes[label])
1✔
872

873
    def save_backtesting_prediction(
1✔
874
        self, append_df: DataFrame
875
    ) -> None:
876
        """
877
        Save prediction dataframe from backtesting to feather file format
878
        :param append_df: dataframe for backtesting period
879
        """
880
        full_predictions_folder = Path(self.full_path / self.backtest_predictions_folder)
1✔
881
        if not full_predictions_folder.is_dir():
1✔
882
            full_predictions_folder.mkdir(parents=True, exist_ok=True)
1✔
883

884
        append_df.to_feather(self.backtesting_results_path)
1✔
885

886
    def get_backtesting_prediction(
1✔
887
        self
888
    ) -> DataFrame:
889
        """
890
        Get prediction dataframe from feather file format
891
        """
892
        append_df = pd.read_feather(self.backtesting_results_path)
1✔
893
        return append_df
1✔
894

895
    def check_if_backtest_prediction_is_valid(
1✔
896
        self,
897
        len_backtest_df: int
898
    ) -> bool:
899
        """
900
        Check if a backtesting prediction already exists and if the predictions
901
        to append have the same size as the backtesting dataframe slice
902
        :param length_backtesting_dataframe: Length of backtesting dataframe slice
903
        :return:
904
        :boolean: whether the prediction file is valid.
905
        """
906
        path_to_predictionfile = Path(self.full_path /
1✔
907
                                      self.backtest_predictions_folder /
908
                                      f"{self.model_filename}_prediction.feather")
909
        self.backtesting_results_path = path_to_predictionfile
1✔
910

911
        file_exists = path_to_predictionfile.is_file()
1✔
912

913
        if file_exists:
1✔
914
            append_df = self.get_backtesting_prediction()
1✔
915
            if len(append_df) == len_backtest_df and 'date' in append_df:
1✔
916
                logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
1✔
917
                return True
1✔
918
            else:
919
                logger.info("A new backtesting prediction file is required. "
×
920
                            "(Number of predictions is different from dataframe length or "
921
                            "old prediction file version).")
922
                return False
×
923
        else:
924
            logger.info(
1✔
925
                f"Could not find backtesting prediction file at {path_to_predictionfile}"
926
            )
927
            return False
1✔
928

929
    def get_full_models_path(self, config: Config) -> Path:
1✔
930
        """
931
        Returns default FreqAI model path
932
        :param config: Configuration dictionary
933
        """
934
        freqai_config: Dict[str, Any] = config["freqai"]
1✔
935
        return Path(
1✔
936
            config["user_data_dir"] / "models" / str(freqai_config.get("identifier"))
937
        )
938

939
    def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
1✔
940
        """
941
        Remove all special characters from feature strings (:)
942
        :param dataframe: the dataframe that just finished indicator population. (unfiltered)
943
        :return: dataframe with cleaned featrue names
944
        """
945

946
        spec_chars = [':']
1✔
947
        for c in spec_chars:
1✔
948
            dataframe.columns = dataframe.columns.str.replace(c, "")
1✔
949

950
        return dataframe
1✔
951

952
    def buffer_timerange(self, timerange: TimeRange):
1✔
953
        """
954
        Buffer the start and end of the timerange. This is used *after* the indicators
955
        are populated.
956

957
        The main example use is when predicting maxima and minima, the argrelextrema
958
        function  cannot know the maxima/minima at the edges of the timerange. To improve
959
        model accuracy, it is best to compute argrelextrema on the full timerange
960
        and then use this function to cut off the edges (buffer) by the kernel.
961

962
        In another case, if the targets are set to a shifted price movement, this
963
        buffer is unnecessary because the shifted candles at the end of the timerange
964
        will be NaN and FreqAI will automatically cut those off of the training
965
        dataset.
966
        """
967
        buffer = self.freqai_config["feature_parameters"]["buffer_train_data_candles"]
1✔
968
        if buffer:
1✔
969
            timerange.stopts -= buffer * timeframe_to_seconds(self.config["timeframe"])
1✔
970
            timerange.startts += buffer * timeframe_to_seconds(self.config["timeframe"])
1✔
971

972
        return timerange
1✔
973

974
    # deprecated functions
975
    def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
1✔
976
        """
977
        Deprecation warning, migration assistance
978
        """
979
        logger.warning(f"Your custom IFreqaiModel relies on the deprecated"
×
980
                       " data pipeline. Please update your model to use the new data pipeline."
981
                       " This can be achieved by following the migration guide at "
982
                       f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline "
983
                       "We added a basic pipeline for you, but this will be removed "
984
                       "in a future version.")
985

986
        return data_dictionary
×
987

988
    def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame:
1✔
989
        """
990
        Deprecation warning, migration assistance
991
        """
992
        logger.warning(f"Your custom IFreqaiModel relies on the deprecated"
×
993
                       " data pipeline. Please update your model to use the new data pipeline."
994
                       " This can be achieved by following the migration guide at "
995
                       f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline "
996
                       "We added a basic pipeline for you, but this will be removed "
997
                       "in a future version.")
998

999
        pred_df, _, _ = self.label_pipeline.inverse_transform(df)
×
1000

1001
        return pred_df
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc