• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 6181253459

08 Sep 2023 06:04AM UTC coverage: 94.614% (+0.06%) from 94.556%
6181253459

push

github-actions

web-flow
Merge pull request #9159 from stash86/fix-adjust

remove old codes when we only can do partial entries

2 of 2 new or added lines in 1 file covered. (100.0%)

19114 of 20202 relevant lines covered (94.61%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.21
/freqtrade/freqai/data_kitchen.py
1
import copy
1✔
2
import inspect
1✔
3
import logging
1✔
4
import random
1✔
5
import shutil
1✔
6
from datetime import datetime, timezone
1✔
7
from pathlib import Path
1✔
8
from typing import Any, Dict, List, Optional, Tuple
1✔
9

10
import numpy as np
1✔
11
import numpy.typing as npt
1✔
12
import pandas as pd
1✔
13
import psutil
1✔
14
from datasieve.pipeline import Pipeline
1✔
15
from pandas import DataFrame
1✔
16
from sklearn.model_selection import train_test_split
1✔
17

18
from freqtrade.configuration import TimeRange
1✔
19
from freqtrade.constants import DOCS_LINK, Config
1✔
20
from freqtrade.data.converter import reduce_dataframe_footprint
1✔
21
from freqtrade.exceptions import OperationalException
1✔
22
from freqtrade.exchange import timeframe_to_seconds
1✔
23
from freqtrade.strategy import merge_informative_pair
1✔
24
from freqtrade.strategy.interface import IStrategy
1✔
25

26

27
SECONDS_IN_DAY = 86400
1✔
28
SECONDS_IN_HOUR = 3600
1✔
29

30
logger = logging.getLogger(__name__)
1✔
31

32

33
class FreqaiDataKitchen:
1✔
34
    """
35
    Class designed to analyze data for a single pair. Employed by the IFreqaiModel class.
36
    Functionalities include holding, saving, loading, and analyzing the data.
37

38
    This object is not persistent, it is reinstantiated for each coin, each time the coin
39
    model needs to be inferenced or trained.
40

41
    Record of contribution:
42
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
43
    project.
44

45
    Conception and software development:
46
    Robert Caulk @robcaulk
47

48
    Theoretical brainstorming:
49
    Elin Törnquist @th0rntwig
50

51
    Code review, software architecture brainstorming:
52
    @xmatthias
53

54
    Beta testing and bug reporting:
55
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
56
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
57
    """
58

59
    def __init__(
1✔
60
        self,
61
        config: Config,
62
        live: bool = False,
63
        pair: str = "",
64
    ):
65
        self.data: Dict[str, Any] = {}
1✔
66
        self.data_dictionary: Dict[str, DataFrame] = {}
1✔
67
        self.config = config
1✔
68
        self.freqai_config: Dict[str, Any] = config["freqai"]
1✔
69
        self.full_df: DataFrame = DataFrame()
1✔
70
        self.append_df: DataFrame = DataFrame()
1✔
71
        self.data_path = Path()
1✔
72
        self.label_list: List = []
1✔
73
        self.training_features_list: List = []
1✔
74
        self.model_filename: str = ""
1✔
75
        self.backtesting_results_path = Path()
1✔
76
        self.backtest_predictions_folder: str = "backtesting_predictions"
1✔
77
        self.live = live
1✔
78
        self.pair = pair
1✔
79
        self.keras: bool = self.freqai_config.get("keras", False)
1✔
80
        self.set_all_pairs()
1✔
81
        self.backtest_live_models = config.get("freqai_backtest_live_models", False)
1✔
82
        self.feature_pipeline = Pipeline()
1✔
83
        self.label_pipeline = Pipeline()
1✔
84
        self.DI_values: npt.NDArray = np.array([])
1✔
85

86
        if not self.live:
1✔
87
            self.full_path = self.get_full_models_path(self.config)
1✔
88

89
            if not self.backtest_live_models:
1✔
90
                self.full_timerange = self.create_fulltimerange(
1✔
91
                    self.config["timerange"], self.freqai_config.get("train_period_days", 0)
92
                )
93
                (self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
1✔
94
                    self.full_timerange,
95
                    config["freqai"]["train_period_days"],
96
                    config["freqai"]["backtest_period_days"],
97
                )
98

99
        self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
1✔
100
        if not self.freqai_config.get("data_kitchen_thread_count", 0):
1✔
101
            self.thread_count = max(int(psutil.cpu_count() * 2 - 2), 1)
×
102
        else:
103
            self.thread_count = self.freqai_config["data_kitchen_thread_count"]
1✔
104
        self.train_dates: DataFrame = pd.DataFrame()
1✔
105
        self.unique_classes: Dict[str, list] = {}
1✔
106
        self.unique_class_list: list = []
1✔
107
        self.backtest_live_models_data: Dict[str, Any] = {}
1✔
108

109
    def set_paths(
1✔
110
        self,
111
        pair: str,
112
        trained_timestamp: Optional[int] = None,
113
    ) -> None:
114
        """
115
        Set the paths to the data for the present coin/botloop
116
        :param metadata: dict = strategy furnished pair metadata
117
        :param trained_timestamp: int = timestamp of most recent training
118
        """
119
        self.full_path = self.get_full_models_path(self.config)
1✔
120
        self.data_path = Path(
1✔
121
            self.full_path
122
            / f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
123
        )
124

125
        return
1✔
126

127
    def make_train_test_datasets(
1✔
128
        self, filtered_dataframe: DataFrame, labels: DataFrame
129
    ) -> Dict[Any, Any]:
130
        """
131
        Given the dataframe for the full history for training, split the data into
132
        training and test data according to user specified parameters in configuration
133
        file.
134
        :param filtered_dataframe: cleaned dataframe ready to be split.
135
        :param labels: cleaned labels ready to be split.
136
        """
137
        feat_dict = self.freqai_config["feature_parameters"]
1✔
138

139
        if 'shuffle' not in self.freqai_config['data_split_parameters']:
1✔
140
            self.freqai_config["data_split_parameters"].update({'shuffle': False})
×
141

142
        weights: npt.ArrayLike
143
        if feat_dict.get("weight_factor", 0) > 0:
1✔
144
            weights = self.set_weights_higher_recent(len(filtered_dataframe))
1✔
145
        else:
146
            weights = np.ones(len(filtered_dataframe))
×
147

148
        if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
1✔
149
            (
1✔
150
                train_features,
151
                test_features,
152
                train_labels,
153
                test_labels,
154
                train_weights,
155
                test_weights,
156
            ) = train_test_split(
157
                filtered_dataframe[: filtered_dataframe.shape[0]],
158
                labels,
159
                weights,
160
                **self.config["freqai"]["data_split_parameters"],
161
            )
162
        else:
163
            test_labels = np.zeros(2)
×
164
            test_features = pd.DataFrame()
×
165
            test_weights = np.zeros(2)
×
166
            train_features = filtered_dataframe
×
167
            train_labels = labels
×
168
            train_weights = weights
×
169

170
        if feat_dict["shuffle_after_split"]:
1✔
171
            rint1 = random.randint(0, 100)
1✔
172
            rint2 = random.randint(0, 100)
1✔
173
            train_features = train_features.sample(
1✔
174
                frac=1, random_state=rint1).reset_index(drop=True)
175
            train_labels = train_labels.sample(frac=1, random_state=rint1).reset_index(drop=True)
1✔
176
            train_weights = pd.DataFrame(train_weights).sample(
1✔
177
                frac=1, random_state=rint1).reset_index(drop=True).to_numpy()[:, 0]
178
            test_features = test_features.sample(frac=1, random_state=rint2).reset_index(drop=True)
1✔
179
            test_labels = test_labels.sample(frac=1, random_state=rint2).reset_index(drop=True)
1✔
180
            test_weights = pd.DataFrame(test_weights).sample(
1✔
181
                frac=1, random_state=rint2).reset_index(drop=True).to_numpy()[:, 0]
182

183
        # Simplest way to reverse the order of training and test data:
184
        if self.freqai_config['feature_parameters'].get('reverse_train_test_order', False):
1✔
185
            return self.build_data_dictionary(
×
186
                test_features, train_features, test_labels,
187
                train_labels, test_weights, train_weights
188
                )
189
        else:
190
            return self.build_data_dictionary(
1✔
191
                train_features, test_features, train_labels,
192
                test_labels, train_weights, test_weights
193
            )
194

195
    def filter_features(
1✔
196
        self,
197
        unfiltered_df: DataFrame,
198
        training_feature_list: List,
199
        label_list: List = list(),
200
        training_filter: bool = True,
201
    ) -> Tuple[DataFrame, DataFrame]:
202
        """
203
        Filter the unfiltered dataframe to extract the user requested features/labels and properly
204
        remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
205
        0s in the prediction dataset. However, prediction dataset do_predict will reflect any
206
        row that had a NaN and will shield user from that prediction.
207

208
        :param unfiltered_df: the full dataframe for the present training period
209
        :param training_feature_list: list, the training feature list constructed by
210
                                      self.build_feature_list() according to user specified
211
                                      parameters in the configuration file.
212
        :param labels: the labels for the dataset
213
        :param training_filter: boolean which lets the function know if it is training data or
214
                                prediction data to be filtered.
215
        :returns:
216
        :filtered_df: dataframe cleaned of NaNs and only containing the user
217
        requested feature set.
218
        :labels: labels cleaned of NaNs.
219
        """
220
        filtered_df = unfiltered_df.filter(training_feature_list, axis=1)
1✔
221
        filtered_df = filtered_df.replace([np.inf, -np.inf], np.nan)
1✔
222

223
        drop_index = pd.isnull(filtered_df).any(axis=1)  # get the rows that have NaNs,
1✔
224
        drop_index = drop_index.replace(True, 1).replace(False, 0)  # pep8 requirement.
1✔
225
        if (training_filter):
1✔
226

227
            # we don't care about total row number (total no. datapoints) in training, we only care
228
            # about removing any row with NaNs
229
            # if labels has multiple columns (user wants to train multiple modelEs), we detect here
230
            labels = unfiltered_df.filter(label_list, axis=1)
1✔
231
            drop_index_labels = pd.isnull(labels).any(axis=1)
1✔
232
            drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0)
1✔
233
            dates = unfiltered_df['date']
1✔
234
            filtered_df = filtered_df[
1✔
235
                (drop_index == 0) & (drop_index_labels == 0)
236
            ]  # dropping values
237
            labels = labels[
1✔
238
                (drop_index == 0) & (drop_index_labels == 0)
239
            ]  # assuming the labels depend entirely on the dataframe here.
240
            self.train_dates = dates[
1✔
241
                (drop_index == 0) & (drop_index_labels == 0)
242
            ]
243
            logger.info(
1✔
244
                f"{self.pair}: dropped {len(unfiltered_df) - len(filtered_df)} training points"
245
                f" due to NaNs in populated dataset {len(unfiltered_df)}."
246
            )
247
            if (1 - len(filtered_df) / len(unfiltered_df)) > 0.1 and self.live:
1✔
248
                worst_indicator = str(unfiltered_df.count().idxmin())
×
249
                logger.warning(
×
250
                    f" {(1 - len(filtered_df)/len(unfiltered_df)) * 100:.0f} percent "
251
                    " of training data dropped due to NaNs, model may perform inconsistent "
252
                    f"with expectations. Verify {worst_indicator}"
253
                )
254
            self.data["filter_drop_index_training"] = drop_index
1✔
255

256
        else:
257

258
            # we are backtesting so we need to preserve row number to send back to strategy,
259
            # so now we use do_predict to avoid any prediction based on a NaN
260
            drop_index = pd.isnull(filtered_df).any(axis=1)
1✔
261
            self.data["filter_drop_index_prediction"] = drop_index
1✔
262
            filtered_df.fillna(0, inplace=True)
1✔
263
            # replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction
264
            # that was based on a single NaN is ultimately protected from buys with do_predict
265
            drop_index = ~drop_index
1✔
266
            self.do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
1✔
267
            if (len(self.do_predict) - self.do_predict.sum()) > 0:
1✔
268
                logger.info(
×
269
                    "dropped %s of %s prediction data points due to NaNs.",
270
                    len(self.do_predict) - self.do_predict.sum(),
271
                    len(filtered_df),
272
                )
273
            labels = []
1✔
274

275
        return filtered_df, labels
1✔
276

277
    def build_data_dictionary(
1✔
278
        self,
279
        train_df: DataFrame,
280
        test_df: DataFrame,
281
        train_labels: DataFrame,
282
        test_labels: DataFrame,
283
        train_weights: Any,
284
        test_weights: Any,
285
    ) -> Dict:
286

287
        self.data_dictionary = {
1✔
288
            "train_features": train_df,
289
            "test_features": test_df,
290
            "train_labels": train_labels,
291
            "test_labels": test_labels,
292
            "train_weights": train_weights,
293
            "test_weights": test_weights,
294
            "train_dates": self.train_dates
295
        }
296

297
        return self.data_dictionary
1✔
298

299
    def split_timerange(
1✔
300
        self, tr: str, train_split: int = 28, bt_split: float = 7
301
    ) -> Tuple[list, list]:
302
        """
303
        Function which takes a single time range (tr) and splits it
304
        into sub timeranges to train and backtest on based on user input
305
        tr: str, full timerange to train on
306
        train_split: the period length for the each training (days). Specified in user
307
        configuration file
308
        bt_split: the backtesting length (days). Specified in user configuration file
309
        """
310

311
        if not isinstance(train_split, int) or train_split < 1:
1✔
312
            raise OperationalException(
1✔
313
                f"train_period_days must be an integer greater than 0. Got {train_split}."
314
            )
315
        train_period_days = train_split * SECONDS_IN_DAY
1✔
316
        bt_period = bt_split * SECONDS_IN_DAY
1✔
317

318
        full_timerange = TimeRange.parse_timerange(tr)
1✔
319
        config_timerange = TimeRange.parse_timerange(self.config["timerange"])
1✔
320
        if config_timerange.stopts == 0:
1✔
321
            config_timerange.stopts = int(
×
322
                datetime.now(tz=timezone.utc).timestamp()
323
            )
324
        timerange_train = copy.deepcopy(full_timerange)
1✔
325
        timerange_backtest = copy.deepcopy(full_timerange)
1✔
326

327
        tr_training_list = []
1✔
328
        tr_backtesting_list = []
1✔
329
        tr_training_list_timerange = []
1✔
330
        tr_backtesting_list_timerange = []
1✔
331
        first = True
1✔
332

333
        while True:
1✔
334
            if not first:
1✔
335
                timerange_train.startts = timerange_train.startts + int(bt_period)
1✔
336
            timerange_train.stopts = timerange_train.startts + train_period_days
1✔
337

338
            first = False
1✔
339
            tr_training_list.append(timerange_train.timerange_str)
1✔
340
            tr_training_list_timerange.append(copy.deepcopy(timerange_train))
1✔
341

342
            # associated backtest period
343
            timerange_backtest.startts = timerange_train.stopts
1✔
344
            timerange_backtest.stopts = timerange_backtest.startts + int(bt_period)
1✔
345

346
            if timerange_backtest.stopts > config_timerange.stopts:
1✔
347
                timerange_backtest.stopts = config_timerange.stopts
1✔
348

349
            tr_backtesting_list.append(timerange_backtest.timerange_str)
1✔
350
            tr_backtesting_list_timerange.append(copy.deepcopy(timerange_backtest))
1✔
351

352
            # ensure we are predicting on exactly same amount of data as requested by user defined
353
            #  --timerange
354
            if timerange_backtest.stopts == config_timerange.stopts:
1✔
355
                break
1✔
356

357
        # print(tr_training_list, tr_backtesting_list)
358
        return tr_training_list_timerange, tr_backtesting_list_timerange
1✔
359

360
    def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
1✔
361
        """
362
        Given a full dataframe, extract the user desired window
363
        :param tr: timerange string that we wish to extract from df
364
        :param df: Dataframe containing all candles to run the entire backtest. Here
365
                   it is sliced down to just the present training period.
366
        """
367
        if not self.live:
1✔
368
            df = df.loc[(df["date"] >= timerange.startdt) & (df["date"] < timerange.stopdt), :]
1✔
369
        else:
370
            df = df.loc[df["date"] >= timerange.startdt, :]
1✔
371

372
        return df
1✔
373

374
    def find_features(self, dataframe: DataFrame) -> None:
1✔
375
        """
376
        Find features in the strategy provided dataframe
377
        :param dataframe: DataFrame = strategy provided dataframe
378
        :return:
379
        features: list = the features to be used for training/prediction
380
        """
381
        column_names = dataframe.columns
1✔
382
        features = [c for c in column_names if "%" in c]
1✔
383

384
        if not features:
1✔
385
            raise OperationalException("Could not find any features!")
×
386

387
        self.training_features_list = features
1✔
388

389
    def find_labels(self, dataframe: DataFrame) -> None:
1✔
390
        column_names = dataframe.columns
1✔
391
        labels = [c for c in column_names if "&" in c]
1✔
392
        self.label_list = labels
1✔
393

394
    def set_weights_higher_recent(self, num_weights: int) -> npt.ArrayLike:
1✔
395
        """
396
        Set weights so that recent data is more heavily weighted during
397
        training than older data.
398
        """
399
        wfactor = self.config["freqai"]["feature_parameters"]["weight_factor"]
1✔
400
        weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
1✔
401
        return weights
1✔
402

403
    def get_predictions_to_append(self, predictions: DataFrame,
1✔
404
                                  do_predict: npt.ArrayLike,
405
                                  dataframe_backtest: DataFrame) -> DataFrame:
406
        """
407
        Get backtest prediction from current backtest period
408
        """
409

410
        append_df = DataFrame()
1✔
411
        for label in predictions.columns:
1✔
412
            append_df[label] = predictions[label]
1✔
413
            if append_df[label].dtype == object:
1✔
414
                continue
1✔
415
            if "labels_mean" in self.data:
1✔
416
                append_df[f"{label}_mean"] = self.data["labels_mean"][label]
1✔
417
            if "labels_std" in self.data:
1✔
418
                append_df[f"{label}_std"] = self.data["labels_std"][label]
1✔
419

420
        for extra_col in self.data["extra_returns_per_train"]:
1✔
421
            append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
×
422

423
        append_df["do_predict"] = do_predict
1✔
424
        if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
1✔
425
            append_df["DI_values"] = self.DI_values
1✔
426

427
        dataframe_backtest.reset_index(drop=True, inplace=True)
1✔
428
        merged_df = pd.concat([dataframe_backtest["date"], append_df], axis=1)
1✔
429
        return merged_df
1✔
430

431
    def append_predictions(self, append_df: DataFrame) -> None:
1✔
432
        """
433
        Append backtest prediction from current backtest period to all previous periods
434
        """
435

436
        if self.full_df.empty:
1✔
437
            self.full_df = append_df
1✔
438
        else:
439
            self.full_df = pd.concat([self.full_df, append_df], axis=0, ignore_index=True)
1✔
440

441
    def fill_predictions(self, dataframe):
1✔
442
        """
443
        Back fill values to before the backtesting range so that the dataframe matches size
444
        when it goes back to the strategy. These rows are not included in the backtest.
445
        """
446
        to_keep = [col for col in dataframe.columns if not col.startswith("&")]
1✔
447
        self.return_dataframe = pd.merge(dataframe[to_keep],
1✔
448
                                         self.full_df, how='left', on='date')
449
        self.return_dataframe[self.full_df.columns] = (
1✔
450
            self.return_dataframe[self.full_df.columns].fillna(value=0))
451
        self.full_df = DataFrame()
1✔
452

453
        return
1✔
454

455
    def create_fulltimerange(self, backtest_tr: str, backtest_period_days: int) -> str:
1✔
456

457
        if not isinstance(backtest_period_days, int):
1✔
458
            raise OperationalException("backtest_period_days must be an integer")
1✔
459

460
        if backtest_period_days < 0:
1✔
461
            raise OperationalException("backtest_period_days must be positive")
1✔
462

463
        backtest_timerange = TimeRange.parse_timerange(backtest_tr)
1✔
464

465
        if backtest_timerange.stopts == 0:
1✔
466
            # typically open ended time ranges do work, however, there are some edge cases where
467
            # it does not. accommodating these kinds of edge cases just to allow open-ended
468
            # timerange is not high enough priority to warrant the effort. It is safer for now
469
            # to simply ask user to add their end date
470
            raise OperationalException("FreqAI backtesting does not allow open ended timeranges. "
×
471
                                       "Please indicate the end date of your desired backtesting. "
472
                                       "timerange.")
473
            # backtest_timerange.stopts = int(
474
            #     datetime.now(tz=timezone.utc).timestamp()
475
            # )
476

477
        backtest_timerange.startts = (
1✔
478
            backtest_timerange.startts - backtest_period_days * SECONDS_IN_DAY
479
        )
480
        full_timerange = backtest_timerange.timerange_str
1✔
481
        config_path = Path(self.config["config_files"][0])
1✔
482

483
        if not self.full_path.is_dir():
1✔
484
            self.full_path.mkdir(parents=True, exist_ok=True)
1✔
485
            shutil.copy(
1✔
486
                config_path.resolve(),
487
                Path(self.full_path / config_path.parts[-1]),
488
            )
489

490
        return full_timerange
1✔
491

492
    def check_if_model_expired(self, trained_timestamp: int) -> bool:
1✔
493
        """
494
        A model age checker to determine if the model is trustworthy based on user defined
495
        `expiration_hours` in the configuration file.
496
        :param trained_timestamp: int = The time of training for the most recent model.
497
        :return:
498
            bool = If the model is expired or not.
499
        """
500
        time = datetime.now(tz=timezone.utc).timestamp()
1✔
501
        elapsed_time = (time - trained_timestamp) / 3600  # hours
1✔
502
        max_time = self.freqai_config.get("expiration_hours", 0)
1✔
503
        if max_time > 0:
1✔
504
            return elapsed_time > max_time
1✔
505
        else:
506
            return False
×
507

508
    def check_if_new_training_required(
1✔
509
        self, trained_timestamp: int
510
    ) -> Tuple[bool, TimeRange, TimeRange]:
511

512
        time = datetime.now(tz=timezone.utc).timestamp()
×
513
        trained_timerange = TimeRange()
×
514
        data_load_timerange = TimeRange()
×
515

516
        timeframes = self.freqai_config["feature_parameters"].get("include_timeframes")
×
517

518
        max_tf_seconds = 0
×
519
        for tf in timeframes:
×
520
            secs = timeframe_to_seconds(tf)
×
521
            if secs > max_tf_seconds:
×
522
                max_tf_seconds = secs
×
523

524
        # We notice that users like to use exotic indicators where
525
        # they do not know the required timeperiod. Here we include a factor
526
        # of safety by multiplying the user considered "max" by 2.
527
        max_period = self.config.get('startup_candle_count', 20) * 2
×
528
        additional_seconds = max_period * max_tf_seconds
×
529

530
        if trained_timestamp != 0:
×
531
            elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR
×
532
            retrain = elapsed_time > self.freqai_config.get("live_retrain_hours", 0)
×
533
            if retrain:
×
534
                trained_timerange.startts = int(
×
535
                    time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
536
                )
537
                trained_timerange.stopts = int(time)
×
538
                # we want to load/populate indicators on more data than we plan to train on so
539
                # because most of the indicators have a rolling timeperiod, and are thus NaNs
540
                # unless they have data further back in time before the start of the train period
541
                data_load_timerange.startts = int(
×
542
                    time
543
                    - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
544
                    - additional_seconds
545
                )
546
                data_load_timerange.stopts = int(time)
×
547
        else:  # user passed no live_trained_timerange in config
548
            trained_timerange.startts = int(
×
549
                time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
550
            )
551
            trained_timerange.stopts = int(time)
×
552

553
            data_load_timerange.startts = int(
×
554
                time
555
                - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
556
                - additional_seconds
557
            )
558
            data_load_timerange.stopts = int(time)
×
559
            retrain = True
×
560

561
        return retrain, trained_timerange, data_load_timerange
×
562

563
    def set_new_model_names(self, pair: str, timestamp_id: int):
1✔
564

565
        coin, _ = pair.split("/")
1✔
566
        self.data_path = Path(
1✔
567
            self.full_path
568
            / f"sub-train-{pair.split('/')[0]}_{timestamp_id}"
569
        )
570

571
        self.model_filename = f"cb_{coin.lower()}_{timestamp_id}"
1✔
572

573
    def set_all_pairs(self) -> None:
1✔
574

575
        self.all_pairs = copy.deepcopy(
1✔
576
            self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
577
        )
578
        for pair in self.config.get("exchange", "").get("pair_whitelist"):
1✔
579
            if pair not in self.all_pairs:
1✔
580
                self.all_pairs.append(pair)
1✔
581

582
    def extract_corr_pair_columns_from_populated_indicators(
1✔
583
        self,
584
        dataframe: DataFrame
585
    ) -> Dict[str, DataFrame]:
586
        """
587
        Find the columns of the dataframe corresponding to the corr_pairlist, save them
588
        in a dictionary to be reused and attached to other pairs.
589

590
        :param dataframe: fully populated dataframe (current pair + corr_pairs)
591
        :return: corr_dataframes, dictionary of dataframes to be attached
592
                 to other pairs in same candle.
593
        """
594
        corr_dataframes: Dict[str, DataFrame] = {}
×
595
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
596

597
        for pair in pairs:
×
598
            pair = pair.replace(':', '')  # lightgbm doesnt like colons
×
599
            pair_cols = [col for col in dataframe.columns if col.startswith("%")
×
600
                         and f"{pair}_" in col]
601

602
            if pair_cols:
×
603
                pair_cols.insert(0, 'date')
×
604
                corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
×
605

606
        return corr_dataframes
×
607

608
    def attach_corr_pair_columns(self, dataframe: DataFrame,
1✔
609
                                 corr_dataframes: Dict[str, DataFrame],
610
                                 current_pair: str) -> DataFrame:
611
        """
612
        Attach the existing corr_pair dataframes to the current pair dataframe before training
613

614
        :param dataframe: current pair strategy dataframe, indicators populated already
615
        :param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
616
        :param current_pair: current pair to which we will attach corr pair dataframe
617
        :return:
618
        :dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
619
                    ready for training
620
        """
621
        pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
×
622
        current_pair = current_pair.replace(':', '')
×
623
        for pair in pairs:
×
624
            pair = pair.replace(':', '')  # lightgbm doesnt work with colons
×
625
            if current_pair != pair:
×
626
                dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
×
627

628
        return dataframe
×
629

630
    def get_pair_data_for_features(self,
1✔
631
                                   pair: str,
632
                                   tf: str,
633
                                   strategy: IStrategy,
634
                                   corr_dataframes: dict = {},
635
                                   base_dataframes: dict = {},
636
                                   is_corr_pairs: bool = False) -> DataFrame:
637
        """
638
        Get the data for the pair. If it's not in the dictionary, get it from the data provider
639
        :param pair: str = pair to get data for
640
        :param tf: str = timeframe to get data for
641
        :param strategy: IStrategy = user defined strategy object
642
        :param corr_dataframes: dict = dict containing the df pair dataframes
643
                                (for user defined timeframes)
644
        :param base_dataframes: dict = dict containing the current pair dataframes
645
                                (for user defined timeframes)
646
        :param is_corr_pairs: bool = whether the pair is a corr pair or not
647
        :return: dataframe = dataframe containing the pair data
648
        """
649
        if is_corr_pairs:
1✔
650
            dataframe = corr_dataframes[pair][tf]
1✔
651
            if not dataframe.empty:
1✔
652
                return dataframe
1✔
653
            else:
654
                dataframe = strategy.dp.get_pair_dataframe(pair=pair, timeframe=tf)
1✔
655
                return dataframe
1✔
656
        else:
657
            dataframe = base_dataframes[tf]
1✔
658
            if not dataframe.empty:
1✔
659
                return dataframe
1✔
660
            else:
661
                dataframe = strategy.dp.get_pair_dataframe(pair=pair, timeframe=tf)
1✔
662
                return dataframe
1✔
663

664
    def merge_features(self, df_main: DataFrame, df_to_merge: DataFrame,
1✔
665
                       tf: str, timeframe_inf: str, suffix: str) -> DataFrame:
666
        """
667
        Merge the features of the dataframe and remove HLCV and date added columns
668
        :param df_main: DataFrame = main dataframe
669
        :param df_to_merge: DataFrame = dataframe to merge
670
        :param tf: str = timeframe of the main dataframe
671
        :param timeframe_inf: str = timeframe of the dataframe to merge
672
        :param suffix: str = suffix to add to the columns of the dataframe to merge
673
        :return: dataframe = merged dataframe
674
        """
675
        dataframe = merge_informative_pair(df_main, df_to_merge, tf, timeframe_inf=timeframe_inf,
1✔
676
                                           append_timeframe=False, suffix=suffix, ffill=True)
677
        skip_columns = [
1✔
678
            (f"{s}_{suffix}") for s in ["date", "open", "high", "low", "close", "volume"]
679
        ]
680
        dataframe = dataframe.drop(columns=skip_columns)
1✔
681
        return dataframe
1✔
682

683
    def populate_features(self, dataframe: DataFrame, pair: str, strategy: IStrategy,
1✔
684
                          corr_dataframes: dict, base_dataframes: dict,
685
                          is_corr_pairs: bool = False) -> DataFrame:
686
        """
687
        Use the user defined strategy functions for populating features
688
        :param dataframe: DataFrame = dataframe to populate
689
        :param pair: str = pair to populate
690
        :param strategy: IStrategy = user defined strategy object
691
        :param corr_dataframes: dict = dict containing the df pair dataframes
692
        :param base_dataframes: dict = dict containing the current pair dataframes
693
        :param is_corr_pairs: bool = whether the pair is a corr pair or not
694
        :return: dataframe = populated dataframe
695
        """
696
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
697

698
        for tf in tfs:
1✔
699
            metadata = {"pair": pair, "tf": tf}
1✔
700
            informative_df = self.get_pair_data_for_features(
1✔
701
                pair, tf, strategy, corr_dataframes, base_dataframes, is_corr_pairs)
702
            informative_copy = informative_df.copy()
1✔
703

704
            for t in self.freqai_config["feature_parameters"]["indicator_periods_candles"]:
1✔
705
                df_features = strategy.feature_engineering_expand_all(
1✔
706
                    informative_copy.copy(), t, metadata=metadata)
707
                suffix = f"{t}"
1✔
708
                informative_df = self.merge_features(informative_df, df_features, tf, tf, suffix)
1✔
709

710
            generic_df = strategy.feature_engineering_expand_basic(
1✔
711
                informative_copy.copy(), metadata=metadata)
712
            suffix = "gen"
1✔
713

714
            informative_df = self.merge_features(informative_df, generic_df, tf, tf, suffix)
1✔
715

716
            indicators = [col for col in informative_df if col.startswith("%")]
1✔
717
            for n in range(self.freqai_config["feature_parameters"]["include_shifted_candles"] + 1):
1✔
718
                if n == 0:
1✔
719
                    continue
1✔
720
                df_shift = informative_df[indicators].shift(n)
1✔
721
                df_shift = df_shift.add_suffix("_shift-" + str(n))
1✔
722
                informative_df = pd.concat((informative_df, df_shift), axis=1)
1✔
723

724
            dataframe = self.merge_features(dataframe.copy(), informative_df,
1✔
725
                                            self.config["timeframe"], tf, f'{pair}_{tf}')
726

727
        return dataframe
1✔
728

729
    def use_strategy_to_populate_indicators(  # noqa: C901
1✔
730
        self,
731
        strategy: IStrategy,
732
        corr_dataframes: dict = {},
733
        base_dataframes: dict = {},
734
        pair: str = "",
735
        prediction_dataframe: DataFrame = pd.DataFrame(),
736
        do_corr_pairs: bool = True,
737
    ) -> DataFrame:
738
        """
739
        Use the user defined strategy for populating indicators during retrain
740
        :param strategy: IStrategy = user defined strategy object
741
        :param corr_dataframes: dict = dict containing the df pair dataframes
742
                                (for user defined timeframes)
743
        :param base_dataframes: dict = dict containing the current pair dataframes
744
                                (for user defined timeframes)
745
        :param pair: str = pair to populate
746
        :param prediction_dataframe: DataFrame = dataframe containing the pair data
747
        used for prediction
748
        :param do_corr_pairs: bool = whether to populate corr pairs or not
749
        :return:
750
        dataframe: DataFrame = dataframe containing populated indicators
751
        """
752

753
        # check if the user is using the deprecated populate_any_indicators function
754
        new_version = inspect.getsource(strategy.populate_any_indicators) == (
1✔
755
            inspect.getsource(IStrategy.populate_any_indicators))
756

757
        if not new_version:
1✔
758
            raise OperationalException(
×
759
                "You are using the `populate_any_indicators()` function"
760
                " which was deprecated on March 1, 2023. Please refer "
761
                "to the strategy migration guide to use the new "
762
                "feature_engineering_* methods: \n"
763
                f"{DOCS_LINK}/strategy_migration/#freqai-strategy \n"
764
                "And the feature_engineering_* documentation: \n"
765
                f"{DOCS_LINK}/freqai-feature-engineering/"
766
                )
767

768
        tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
1✔
769
        pairs: List[str] = self.freqai_config["feature_parameters"].get(
1✔
770
            "include_corr_pairlist", [])
771

772
        for tf in tfs:
1✔
773
            if tf not in base_dataframes:
1✔
774
                base_dataframes[tf] = pd.DataFrame()
1✔
775
            for p in pairs:
1✔
776
                if p not in corr_dataframes:
1✔
777
                    corr_dataframes[p] = {}
1✔
778
                if tf not in corr_dataframes[p]:
1✔
779
                    corr_dataframes[p][tf] = pd.DataFrame()
1✔
780

781
        if not prediction_dataframe.empty:
1✔
782
            dataframe = prediction_dataframe.copy()
1✔
783
        else:
784
            dataframe = base_dataframes[self.config["timeframe"]].copy()
1✔
785

786
        corr_pairs: List[str] = self.freqai_config["feature_parameters"].get(
1✔
787
            "include_corr_pairlist", [])
788
        dataframe = self.populate_features(dataframe.copy(), pair, strategy,
1✔
789
                                           corr_dataframes, base_dataframes)
790
        metadata = {"pair": pair}
1✔
791
        dataframe = strategy.feature_engineering_standard(dataframe.copy(), metadata=metadata)
1✔
792
        # ensure corr pairs are always last
793
        for corr_pair in corr_pairs:
1✔
794
            if pair == corr_pair:
1✔
795
                continue  # dont repeat anything from whitelist
1✔
796
            if corr_pairs and do_corr_pairs:
1✔
797
                dataframe = self.populate_features(dataframe.copy(), corr_pair, strategy,
1✔
798
                                                   corr_dataframes, base_dataframes, True)
799

800
        if self.live:
1✔
801
            dataframe = strategy.set_freqai_targets(dataframe.copy(), metadata=metadata)
1✔
802
            dataframe = self.remove_special_chars_from_feature_names(dataframe)
1✔
803

804
        self.get_unique_classes_from_labels(dataframe)
1✔
805

806
        if self.config.get('reduce_df_footprint', False):
1✔
807
            dataframe = reduce_dataframe_footprint(dataframe)
1✔
808

809
        return dataframe
1✔
810

811
    def fit_labels(self) -> None:
1✔
812
        """
813
        Fit the labels with a gaussian distribution
814
        """
815
        import scipy as spy
1✔
816

817
        self.data["labels_mean"], self.data["labels_std"] = {}, {}
1✔
818
        for label in self.data_dictionary["train_labels"].columns:
1✔
819
            if self.data_dictionary["train_labels"][label].dtype == object:
1✔
820
                continue
1✔
821
            f = spy.stats.norm.fit(self.data_dictionary["train_labels"][label])
1✔
822
            self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
1✔
823

824
        # incase targets are classifications
825
        for label in self.unique_class_list:
1✔
826
            self.data["labels_mean"][label], self.data["labels_std"][label] = 0, 0
1✔
827

828
        return
1✔
829

830
    def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
1✔
831
        """
832
        Remove the features from the dataframe before returning it to strategy. This keeps it
833
        compact for Frequi purposes.
834
        """
835
        to_keep = [
×
836
            col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
837
        ]
838
        return dataframe[to_keep]
×
839

840
    def get_unique_classes_from_labels(self, dataframe: DataFrame) -> None:
1✔
841

842
        # self.find_features(dataframe)
843
        self.find_labels(dataframe)
1✔
844

845
        for key in self.label_list:
1✔
846
            if dataframe[key].dtype == object:
1✔
847
                self.unique_classes[key] = dataframe[key].dropna().unique()
1✔
848

849
        if self.unique_classes:
1✔
850
            for label in self.unique_classes:
1✔
851
                self.unique_class_list += list(self.unique_classes[label])
1✔
852

853
    def save_backtesting_prediction(
1✔
854
        self, append_df: DataFrame
855
    ) -> None:
856
        """
857
        Save prediction dataframe from backtesting to feather file format
858
        :param append_df: dataframe for backtesting period
859
        """
860
        full_predictions_folder = Path(self.full_path / self.backtest_predictions_folder)
1✔
861
        if not full_predictions_folder.is_dir():
1✔
862
            full_predictions_folder.mkdir(parents=True, exist_ok=True)
1✔
863

864
        append_df.to_feather(self.backtesting_results_path)
1✔
865

866
    def get_backtesting_prediction(
1✔
867
        self
868
    ) -> DataFrame:
869
        """
870
        Get prediction dataframe from feather file format
871
        """
872
        append_df = pd.read_feather(self.backtesting_results_path)
1✔
873
        return append_df
1✔
874

875
    def check_if_backtest_prediction_is_valid(
1✔
876
        self,
877
        len_backtest_df: int
878
    ) -> bool:
879
        """
880
        Check if a backtesting prediction already exists and if the predictions
881
        to append have the same size as the backtesting dataframe slice
882
        :param length_backtesting_dataframe: Length of backtesting dataframe slice
883
        :return:
884
        :boolean: whether the prediction file is valid.
885
        """
886
        path_to_predictionfile = Path(self.full_path /
1✔
887
                                      self.backtest_predictions_folder /
888
                                      f"{self.model_filename}_prediction.feather")
889
        self.backtesting_results_path = path_to_predictionfile
1✔
890

891
        file_exists = path_to_predictionfile.is_file()
1✔
892

893
        if file_exists:
1✔
894
            append_df = self.get_backtesting_prediction()
1✔
895
            if len(append_df) == len_backtest_df and 'date' in append_df:
1✔
896
                logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
1✔
897
                return True
1✔
898
            else:
899
                logger.info("A new backtesting prediction file is required. "
×
900
                            "(Number of predictions is different from dataframe length or "
901
                            "old prediction file version).")
902
                return False
×
903
        else:
904
            logger.info(
1✔
905
                f"Could not find backtesting prediction file at {path_to_predictionfile}"
906
            )
907
            return False
1✔
908

909
    def get_full_models_path(self, config: Config) -> Path:
1✔
910
        """
911
        Returns default FreqAI model path
912
        :param config: Configuration dictionary
913
        """
914
        freqai_config: Dict[str, Any] = config["freqai"]
1✔
915
        return Path(
1✔
916
            config["user_data_dir"] / "models" / str(freqai_config.get("identifier"))
917
        )
918

919
    def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
1✔
920
        """
921
        Remove all special characters from feature strings (:)
922
        :param dataframe: the dataframe that just finished indicator population. (unfiltered)
923
        :return: dataframe with cleaned featrue names
924
        """
925

926
        spec_chars = [':']
1✔
927
        for c in spec_chars:
1✔
928
            dataframe.columns = dataframe.columns.str.replace(c, "")
1✔
929

930
        return dataframe
1✔
931

932
    def buffer_timerange(self, timerange: TimeRange):
1✔
933
        """
934
        Buffer the start and end of the timerange. This is used *after* the indicators
935
        are populated.
936

937
        The main example use is when predicting maxima and minima, the argrelextrema
938
        function  cannot know the maxima/minima at the edges of the timerange. To improve
939
        model accuracy, it is best to compute argrelextrema on the full timerange
940
        and then use this function to cut off the edges (buffer) by the kernel.
941

942
        In another case, if the targets are set to a shifted price movement, this
943
        buffer is unnecessary because the shifted candles at the end of the timerange
944
        will be NaN and FreqAI will automatically cut those off of the training
945
        dataset.
946
        """
947
        buffer = self.freqai_config["feature_parameters"]["buffer_train_data_candles"]
1✔
948
        if buffer:
1✔
949
            timerange.stopts -= buffer * timeframe_to_seconds(self.config["timeframe"])
1✔
950
            timerange.startts += buffer * timeframe_to_seconds(self.config["timeframe"])
1✔
951

952
        return timerange
1✔
953

954
    # deprecated functions
955
    def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
1✔
956
        """
957
        Deprecation warning, migration assistance
958
        """
959
        logger.warning(f"Your custom IFreqaiModel relies on the deprecated"
×
960
                       " data pipeline. Please update your model to use the new data pipeline."
961
                       " This can be achieved by following the migration guide at "
962
                       f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline "
963
                       "We added a basic pipeline for you, but this will be removed "
964
                       "in a future version.")
965

966
        return data_dictionary
×
967

968
    def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame:
1✔
969
        """
970
        Deprecation warning, migration assistance
971
        """
972
        logger.warning(f"Your custom IFreqaiModel relies on the deprecated"
×
973
                       " data pipeline. Please update your model to use the new data pipeline."
974
                       " This can be achieved by following the migration guide at "
975
                       f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline "
976
                       "We added a basic pipeline for you, but this will be removed "
977
                       "in a future version.")
978

979
        pred_df, _, _ = self.label_pipeline.inverse_transform(df)
×
980

981
        return pred_df
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc