• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.25
/freqtrade/freqai/data_drawer.py
1
import collections
1✔
2
import importlib
1✔
3
import logging
1✔
4
import re
1✔
5
import shutil
1✔
6
import threading
1✔
7
import warnings
1✔
8
from datetime import datetime, timedelta, timezone
1✔
9
from pathlib import Path
1✔
10
from typing import Any, Dict, Tuple, TypedDict
1✔
11

12
import numpy as np
1✔
13
import pandas as pd
1✔
14
import psutil
1✔
15
import rapidjson
1✔
16
from joblib.externals import cloudpickle
1✔
17
from numpy.typing import NDArray
1✔
18
from pandas import DataFrame
1✔
19

20
from freqtrade.configuration import TimeRange
1✔
21
from freqtrade.constants import Config
1✔
22
from freqtrade.data.history import load_pair_history
1✔
23
from freqtrade.enums import CandleType
1✔
24
from freqtrade.exceptions import OperationalException
1✔
25
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
1✔
26
from freqtrade.strategy.interface import IStrategy
1✔
27

28

29
logger = logging.getLogger(__name__)
1✔
30

31
FEATURE_PIPELINE = "feature_pipeline"
1✔
32
LABEL_PIPELINE = "label_pipeline"
1✔
33
TRAINDF = "trained_df"
1✔
34
METADATA = "metadata"
1✔
35

36

37
class pair_info(TypedDict):
1✔
38
    model_filename: str
1✔
39
    trained_timestamp: int
1✔
40
    data_path: str
1✔
41
    extras: dict
1✔
42

43

44
class FreqaiDataDrawer:
1✔
45
    """
46
    Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving
47
    /loading to/from disk.
48
    This object remains persistent throughout live/dry.
49

50
    Record of contribution:
51
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
52
    project.
53

54
    Conception and software development:
55
    Robert Caulk @robcaulk
56

57
    Theoretical brainstorming:
58
    Elin Törnquist @th0rntwig
59

60
    Code review, software architecture brainstorming:
61
    @xmatthias
62

63
    Beta testing and bug reporting:
64
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
65
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
66
    """
67

68
    def __init__(self, full_path: Path, config: Config):
1✔
69

70
        self.config = config
1✔
71
        self.freqai_info = config.get("freqai", {})
1✔
72
        # dictionary holding all pair metadata necessary to load in from disk
73
        self.pair_dict: Dict[str, pair_info] = {}
1✔
74
        # dictionary holding all actively inferenced models in memory given a model filename
75
        self.model_dictionary: Dict[str, Any] = {}
1✔
76
        # all additional metadata that we want to keep in ram
77
        self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {}
1✔
78
        self.model_return_values: Dict[str, DataFrame] = {}
1✔
79
        self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
1✔
80
        self.historic_predictions: Dict[str, DataFrame] = {}
1✔
81
        self.full_path = full_path
1✔
82
        self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
1✔
83
        self.historic_predictions_bkp_path = Path(
1✔
84
            self.full_path / "historic_predictions.backup.pkl")
85
        self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
1✔
86
        self.global_metadata_path = Path(self.full_path / "global_metadata.json")
1✔
87
        self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
1✔
88
        self.load_drawer_from_disk()
1✔
89
        self.load_historic_predictions_from_disk()
1✔
90
        self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
1✔
91
        self.load_metric_tracker_from_disk()
1✔
92
        self.training_queue: Dict[str, int] = {}
1✔
93
        self.history_lock = threading.Lock()
1✔
94
        self.save_lock = threading.Lock()
1✔
95
        self.pair_dict_lock = threading.Lock()
1✔
96
        self.metric_tracker_lock = threading.Lock()
1✔
97
        self.old_DBSCAN_eps: Dict[str, float] = {}
1✔
98
        self.empty_pair_dict: pair_info = {
1✔
99
                "model_filename": "", "trained_timestamp": 0,
100
                "data_path": "", "extras": {}}
101
        self.model_type = self.freqai_info.get('model_save_type', 'joblib')
1✔
102

103
    def update_metric_tracker(self, metric: str, value: float, pair: str) -> None:
1✔
104
        """
105
        General utility for adding and updating custom metrics. Typically used
106
        for adding training performance, train timings, inferenc timings, cpu loads etc.
107
        """
108
        with self.metric_tracker_lock:
×
109
            if pair not in self.metric_tracker:
×
110
                self.metric_tracker[pair] = {}
×
111
            if metric not in self.metric_tracker[pair]:
×
112
                self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []}
×
113

114
            timestamp = int(datetime.now(timezone.utc).timestamp())
×
115
            self.metric_tracker[pair][metric]['value'].append(value)
×
116
            self.metric_tracker[pair][metric]['timestamp'].append(timestamp)
×
117

118
    def collect_metrics(self, time_spent: float, pair: str):
1✔
119
        """
120
        Add metrics to the metric tracker dictionary
121
        """
122
        load1, load5, load15 = psutil.getloadavg()
×
123
        cpus = psutil.cpu_count()
×
124
        self.update_metric_tracker('train_time', time_spent, pair)
×
125
        self.update_metric_tracker('cpu_load1min', load1 / cpus, pair)
×
126
        self.update_metric_tracker('cpu_load5min', load5 / cpus, pair)
×
127
        self.update_metric_tracker('cpu_load15min', load15 / cpus, pair)
×
128

129
    def load_global_metadata_from_disk(self):
1✔
130
        """
131
        Locate and load a previously saved global metadata in present model folder.
132
        """
133
        exists = self.global_metadata_path.is_file()
1✔
134
        if exists:
1✔
135
            with self.global_metadata_path.open("r") as fp:
1✔
136
                metatada_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
137
                return metatada_dict
1✔
138
        return {}
1✔
139

140
    def load_drawer_from_disk(self):
1✔
141
        """
142
        Locate and load a previously saved data drawer full of all pair model metadata in
143
        present model folder.
144
        Load any existing metric tracker that may be present.
145
        """
146
        exists = self.pair_dictionary_path.is_file()
1✔
147
        if exists:
1✔
148
            with self.pair_dictionary_path.open("r") as fp:
1✔
149
                self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
150
        else:
151
            logger.info("Could not find existing datadrawer, starting from scratch")
1✔
152

153
    def load_metric_tracker_from_disk(self):
1✔
154
        """
155
        Tries to load an existing metrics dictionary if the user
156
        wants to collect metrics.
157
        """
158
        if self.freqai_info.get('write_metrics_to_disk', False):
1✔
159
            exists = self.metric_tracker_path.is_file()
×
160
            if exists:
×
161
                with self.metric_tracker_path.open("r") as fp:
×
162
                    self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
×
163
                logger.info("Loading existing metric tracker from disk.")
×
164
            else:
165
                logger.info("Could not find existing metric tracker, starting from scratch")
×
166

167
    def load_historic_predictions_from_disk(self):
1✔
168
        """
169
        Locate and load a previously saved historic predictions.
170
        :return: bool - whether or not the drawer was located
171
        """
172
        exists = self.historic_predictions_path.is_file()
1✔
173
        if exists:
1✔
174
            try:
1✔
175
                with self.historic_predictions_path.open("rb") as fp:
1✔
176
                    self.historic_predictions = cloudpickle.load(fp)
1✔
177
                logger.info(
1✔
178
                    f"Found existing historic predictions at {self.full_path}, but beware "
179
                    "that statistics may be inaccurate if the bot has been offline for "
180
                    "an extended period of time."
181
                )
182
            except EOFError:
×
183
                logger.warning(
×
184
                    'Historical prediction file was corrupted. Trying to load backup file.')
185
                with self.historic_predictions_bkp_path.open("rb") as fp:
×
186
                    self.historic_predictions = cloudpickle.load(fp)
×
187
                logger.warning('FreqAI successfully loaded the backup historical predictions file.')
×
188

189
        else:
190
            logger.info("Could not find existing historic_predictions, starting from scratch")
1✔
191

192
        return exists
1✔
193

194
    def save_historic_predictions_to_disk(self):
1✔
195
        """
196
        Save historic predictions pickle to disk
197
        """
198
        with self.historic_predictions_path.open("wb") as fp:
1✔
199
            cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
1✔
200

201
        # create a backup
202
        shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
1✔
203

204
    def save_metric_tracker_to_disk(self):
1✔
205
        """
206
        Save metric tracker of all pair metrics collected.
207
        """
208
        with self.save_lock:
1✔
209
            with self.metric_tracker_path.open('w') as fp:
1✔
210
                rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder,
1✔
211
                               number_mode=rapidjson.NM_NATIVE)
212

213
    def save_drawer_to_disk(self) -> None:
1✔
214
        """
215
        Save data drawer full of all pair model metadata in present model folder.
216
        """
217
        with self.save_lock:
1✔
218
            with self.pair_dictionary_path.open('w') as fp:
1✔
219
                rapidjson.dump(self.pair_dict, fp, default=self.np_encoder,
1✔
220
                               number_mode=rapidjson.NM_NATIVE)
221

222
    def save_global_metadata_to_disk(self, metadata: Dict[str, Any]):
1✔
223
        """
224
        Save global metadata json to disk
225
        """
226
        with self.save_lock:
1✔
227
            with self.global_metadata_path.open('w') as fp:
1✔
228
                rapidjson.dump(metadata, fp, default=self.np_encoder,
1✔
229
                               number_mode=rapidjson.NM_NATIVE)
230

231
    def np_encoder(self, object):
1✔
232
        if isinstance(object, np.generic):
1✔
233
            return object.item()
1✔
234

235
    def get_pair_dict_info(self, pair: str) -> Tuple[str, int]:
1✔
236
        """
237
        Locate and load existing model metadata from persistent storage. If not located,
238
        create a new one and append the current pair to it and prepare it for its first
239
        training
240
        :param pair: str: pair to lookup
241
        :return:
242
            model_filename: str = unique filename used for loading persistent objects from disk
243
            trained_timestamp: int = the last time the coin was trained
244
        """
245

246
        pair_dict = self.pair_dict.get(pair)
1✔
247

248
        if pair_dict:
1✔
249
            model_filename = pair_dict["model_filename"]
1✔
250
            trained_timestamp = pair_dict["trained_timestamp"]
1✔
251
        else:
252
            self.pair_dict[pair] = self.empty_pair_dict.copy()
1✔
253
            model_filename = ""
1✔
254
            trained_timestamp = 0
1✔
255

256
        return model_filename, trained_timestamp
1✔
257

258
    def set_pair_dict_info(self, metadata: dict) -> None:
1✔
259
        pair_in_dict = self.pair_dict.get(metadata["pair"])
1✔
260
        if pair_in_dict:
1✔
261
            return
×
262
        else:
263
            self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy()
1✔
264
            return
1✔
265

266
    def set_initial_return_values(
1✔
267
        self, pair: str,
268
        pred_df: DataFrame,
269
        dataframe: DataFrame
270
    ) -> None:
271
        """
272
        Set the initial return values to the historical predictions dataframe. This avoids needing
273
        to repredict on historical candles, and also stores historical predictions despite
274
        retrainings (so stored predictions are true predictions, not just inferencing on trained
275
        data).
276

277
        We also aim to keep the date from historical predictions so that the FreqUI displays
278
        zeros during any downtime (between FreqAI reloads).
279
        """
280

281
        new_pred = pred_df.copy()
1✔
282
        # set new_pred values to nans (we want to signal to user that there was nothing
283
        # historically made during downtime. The newest pred will get appended later in
284
        # append_model_predictions)
285

286
        new_pred["date_pred"] = dataframe["date"]
1✔
287
        # set everything to nan except date_pred
288
        columns_to_nan = new_pred.columns.difference(['date_pred', 'date'])
1✔
289
        new_pred[columns_to_nan] = new_pred[columns_to_nan].astype(
1✔
290
            float).values * np.nan
291

292
        hist_preds = self.historic_predictions[pair].copy()
1✔
293

294
        # ensure both dataframes have the same date format so they can be merged
295
        new_pred["date_pred"] = pd.to_datetime(new_pred["date_pred"])
1✔
296
        hist_preds["date_pred"] = pd.to_datetime(hist_preds["date_pred"])
1✔
297

298
        # find the closest common date between new_pred and historic predictions
299
        # and cut off the new_pred dataframe at that date
300
        common_dates = pd.merge(new_pred, hist_preds,
1✔
301
                                on="date_pred", how="inner")
302
        if len(common_dates.index) > 0:
1✔
303
            new_pred = new_pred.iloc[len(common_dates):]
1✔
304
        else:
305
            logger.warning("No common dates found between new predictions and historic "
1✔
306
                           "predictions. You likely left your FreqAI instance offline "
307
                           f"for more than {len(dataframe.index)} candles.")
308

309
        # Pandas warns that its keeping dtypes of non NaN columns...
310
        # yea we know and we already want that behavior. Ignoring.
311
        with warnings.catch_warnings():
1✔
312
            warnings.filterwarnings("ignore", category=FutureWarning)
1✔
313
            # reindex new_pred columns to match the historic predictions dataframe
314
            new_pred_reindexed = new_pred.reindex(columns=hist_preds.columns)
1✔
315
            df_concat = pd.concat(
1✔
316
                [hist_preds, new_pred_reindexed],
317
                ignore_index=True
318
            )
319

320
        # any missing values will get zeroed out so users can see the exact
321
        # downtime in FreqUI
322
        df_concat = df_concat.fillna(0)
1✔
323
        self.historic_predictions[pair] = df_concat
1✔
324
        self.model_return_values[pair] = df_concat.tail(
1✔
325
            len(dataframe.index)).reset_index(drop=True)
326

327
    def append_model_predictions(self, pair: str, predictions: DataFrame,
1✔
328
                                 do_preds: NDArray[np.int_],
329
                                 dk: FreqaiDataKitchen, strat_df: DataFrame) -> None:
330
        """
331
        Append model predictions to historic predictions dataframe, then set the
332
        strategy return dataframe to the tail of the historic predictions. The length of
333
        the tail is equivalent to the length of the dataframe that entered FreqAI from
334
        the strategy originally. Doing this allows FreqUI to always display the correct
335
        historic predictions.
336
        """
337

338
        len_df = len(strat_df)
×
339
        index = self.historic_predictions[pair].index[-1:]
×
340
        columns = self.historic_predictions[pair].columns
×
341

342
        zeros_df = pd.DataFrame(
×
343
            np.zeros((1, len(columns))),
344
            index=index,
345
            columns=columns
346
        )
347
        self.historic_predictions[pair] = pd.concat(
×
348
            [self.historic_predictions[pair], zeros_df],
349
            ignore_index=True,
350
            axis=0
351
        )
352
        df = self.historic_predictions[pair]
×
353

354
        # model outputs and associated statistics
355
        for label in predictions.columns:
×
356
            label_loc = df.columns.get_loc(label)
×
357
            pred_label_loc = predictions.columns.get_loc(label)
×
358
            df.iloc[-1, label_loc] = predictions.iloc[-1, pred_label_loc]
×
359
            if df[label].dtype == object:
×
360
                continue
×
361
            label_mean_loc = df.columns.get_loc(f"{label}_mean")
×
362
            label_std_loc = df.columns.get_loc(f"{label}_std")
×
363
            df.iloc[-1, label_mean_loc] = dk.data["labels_mean"][label]
×
364
            df.iloc[-1, label_std_loc] = dk.data["labels_std"][label]
×
365

366
        # outlier indicators
367
        do_predict_loc = df.columns.get_loc("do_predict")
×
368
        df.iloc[-1, do_predict_loc] = do_preds[-1]
×
369
        if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
×
370
            DI_values_loc = df.columns.get_loc("DI_values")
×
371
            df.iloc[-1, DI_values_loc] = dk.DI_values[-1]
×
372

373
        # extra values the user added within custom prediction model
374
        if dk.data['extra_returns_per_train']:
×
375
            rets = dk.data['extra_returns_per_train']
×
376
            for return_str in rets:
×
377
                return_loc = df.columns.get_loc(return_str)
×
378
                df.iloc[-1, return_loc] = rets[return_str]
×
379

380
        high_price_loc = df.columns.get_loc("high_price")
×
381
        high_loc = strat_df.columns.get_loc("high")
×
382
        df.iloc[-1, high_price_loc] = strat_df.iloc[-1, high_loc]
×
383
        low_price_loc = df.columns.get_loc("low_price")
×
384
        low_loc = strat_df.columns.get_loc("low")
×
385
        df.iloc[-1, low_price_loc] = strat_df.iloc[-1, low_loc]
×
386
        close_price_loc = df.columns.get_loc("close_price")
×
387
        close_loc = strat_df.columns.get_loc("close")
×
388
        df.iloc[-1, close_price_loc] = strat_df.iloc[-1, close_loc]
×
389
        date_pred_loc = df.columns.get_loc("date_pred")
×
390
        date_loc = strat_df.columns.get_loc("date")
×
391
        df.iloc[-1, date_pred_loc] = strat_df.iloc[-1, date_loc]
×
392

393
        self.model_return_values[pair] = df.tail(len_df).reset_index(drop=True)
×
394

395
    def attach_return_values_to_return_dataframe(
1✔
396
            self, pair: str, dataframe: DataFrame) -> DataFrame:
397
        """
398
        Attach the return values to the strat dataframe
399
        :param dataframe: DataFrame = strategy dataframe
400
        :return: DataFrame = strat dataframe with return values attached
401
        """
402
        df = self.model_return_values[pair]
×
403
        to_keep = [col for col in dataframe.columns if not col.startswith("&")]
×
404
        dataframe = pd.concat([dataframe[to_keep], df], axis=1)
×
405
        return dataframe
×
406

407
    def return_null_values_to_strategy(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> None:
1✔
408
        """
409
        Build 0 filled dataframe to return to strategy
410
        """
411

412
        dk.find_features(dataframe)
×
413
        dk.find_labels(dataframe)
×
414

415
        full_labels = dk.label_list + dk.unique_class_list
×
416

417
        for label in full_labels:
×
418
            dataframe[label] = 0
×
419
            dataframe[f"{label}_mean"] = 0
×
420
            dataframe[f"{label}_std"] = 0
×
421

422
        dataframe["do_predict"] = 0
×
423

424
        if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
×
425
            dataframe["DI_values"] = 0
×
426

427
        if dk.data['extra_returns_per_train']:
×
428
            rets = dk.data['extra_returns_per_train']
×
429
            for return_str in rets:
×
430
                dataframe[return_str] = 0
×
431

432
        dk.return_dataframe = dataframe
×
433

434
    def purge_old_models(self) -> None:
1✔
435

436
        num_keep = self.freqai_info["purge_old_models"]
1✔
437
        if not num_keep:
1✔
438
            return
×
439
        elif isinstance(num_keep, bool):
1✔
440
            num_keep = 2
×
441

442
        model_folders = [x for x in self.full_path.iterdir() if x.is_dir()]
1✔
443

444
        pattern = re.compile(r"sub-train-(\w+)_(\d{10})")
1✔
445

446
        delete_dict: Dict[str, Any] = {}
1✔
447

448
        for dir in model_folders:
1✔
449
            result = pattern.match(str(dir.name))
1✔
450
            if result is None:
1✔
451
                continue
1✔
452
            coin = result.group(1)
1✔
453
            timestamp = result.group(2)
1✔
454

455
            if coin not in delete_dict:
1✔
456
                delete_dict[coin] = {}
1✔
457
                delete_dict[coin]["num_folders"] = 1
1✔
458
                delete_dict[coin]["timestamps"] = {int(timestamp): dir}
1✔
459
            else:
460
                delete_dict[coin]["num_folders"] += 1
×
461
                delete_dict[coin]["timestamps"][int(timestamp)] = dir
×
462

463
        for coin in delete_dict:
1✔
464
            if delete_dict[coin]["num_folders"] > num_keep:
1✔
465
                sorted_dict = collections.OrderedDict(
×
466
                    sorted(delete_dict[coin]["timestamps"].items())
467
                )
468
                num_delete = len(sorted_dict) - num_keep
×
469
                deleted = 0
×
470
                for k, v in sorted_dict.items():
×
471
                    if deleted >= num_delete:
×
472
                        break
×
473
                    logger.info(f"Freqai purging old model file {v}")
×
474
                    shutil.rmtree(v)
×
475
                    deleted += 1
×
476

477
    def save_metadata(self, dk: FreqaiDataKitchen) -> None:
1✔
478
        """
479
        Saves only metadata for backtesting studies if user prefers
480
        not to save model data. This saves tremendous amounts of space
481
        for users generating huge studies.
482
        This is only active when `save_backtest_models`: false (not default)
483
        """
484
        if not dk.data_path.is_dir():
×
485
            dk.data_path.mkdir(parents=True, exist_ok=True)
×
486

487
        save_path = Path(dk.data_path)
×
488

489
        dk.data["data_path"] = str(dk.data_path)
×
490
        dk.data["model_filename"] = str(dk.model_filename)
×
491
        dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
×
492
        dk.data["label_list"] = dk.label_list
×
493

494
        with (save_path / f"{dk.model_filename}_{METADATA}.json").open("w") as fp:
×
495
            rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
×
496

497
        return
×
498

499
    def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
1✔
500
        """
501
        Saves all data associated with a model for a single sub-train time range
502
        :param model: User trained model which can be reused for inferencing to generate
503
                      predictions
504
        """
505

506
        if not dk.data_path.is_dir():
1✔
507
            dk.data_path.mkdir(parents=True, exist_ok=True)
1✔
508

509
        save_path = Path(dk.data_path)
1✔
510

511
        # Save the trained model
512
        if self.model_type == 'joblib':
1✔
513
            with (save_path / f"{dk.model_filename}_model.joblib").open("wb") as fp:
1✔
514
                cloudpickle.dump(model, fp)
1✔
515
        elif self.model_type == 'keras':
1✔
516
            model.save(save_path / f"{dk.model_filename}_model.h5")
×
517
        elif self.model_type in ["stable_baselines3", "sb3_contrib", "pytorch"]:
1✔
518
            model.save(save_path / f"{dk.model_filename}_model.zip")
1✔
519

520
        dk.data["data_path"] = str(dk.data_path)
1✔
521
        dk.data["model_filename"] = str(dk.model_filename)
1✔
522
        dk.data["training_features_list"] = dk.training_features_list
1✔
523
        dk.data["label_list"] = dk.label_list
1✔
524
        # store the metadata
525
        with (save_path / f"{dk.model_filename}_{METADATA}.json").open("w") as fp:
1✔
526
            rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
1✔
527

528
        # save the pipelines to pickle files
529
        with (save_path / f"{dk.model_filename}_{FEATURE_PIPELINE}.pkl").open("wb") as fp:
1✔
530
            cloudpickle.dump(dk.feature_pipeline, fp)
1✔
531

532
        with (save_path / f"{dk.model_filename}_{LABEL_PIPELINE}.pkl").open("wb") as fp:
1✔
533
            cloudpickle.dump(dk.label_pipeline, fp)
1✔
534

535
        # save the train data to file for post processing if desired
536
        dk.data_dictionary["train_features"].to_pickle(
1✔
537
            save_path / f"{dk.model_filename}_{TRAINDF}.pkl"
538
        )
539

540
        dk.data_dictionary["train_dates"].to_pickle(
1✔
541
            save_path / f"{dk.model_filename}_trained_dates_df.pkl"
542
        )
543

544
        self.model_dictionary[coin] = model
1✔
545
        self.pair_dict[coin]["model_filename"] = dk.model_filename
1✔
546
        self.pair_dict[coin]["data_path"] = str(dk.data_path)
1✔
547

548
        if coin not in self.meta_data_dictionary:
1✔
549
            self.meta_data_dictionary[coin] = {}
1✔
550
        self.meta_data_dictionary[coin][METADATA] = dk.data
1✔
551
        self.meta_data_dictionary[coin][FEATURE_PIPELINE] = dk.feature_pipeline
1✔
552
        self.meta_data_dictionary[coin][LABEL_PIPELINE] = dk.label_pipeline
1✔
553
        self.save_drawer_to_disk()
1✔
554

555
        return
1✔
556

557
    def load_metadata(self, dk: FreqaiDataKitchen) -> None:
1✔
558
        """
559
        Load only metadata into datakitchen to increase performance during
560
        presaved backtesting (prediction file loading).
561
        """
562
        with (dk.data_path / f"{dk.model_filename}_{METADATA}.json").open("r") as fp:
1✔
563
            dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
564
            dk.training_features_list = dk.data["training_features_list"]
1✔
565
            dk.label_list = dk.data["label_list"]
1✔
566

567
    def load_data(self, coin: str, dk: FreqaiDataKitchen) -> Any:  # noqa: C901
1✔
568
        """
569
        loads all data required to make a prediction on a sub-train time range
570
        :returns:
571
        :model: User trained model which can be inferenced for new predictions
572
        """
573

574
        if not self.pair_dict[coin]["model_filename"]:
1✔
575
            return None
×
576

577
        if dk.live:
1✔
578
            dk.model_filename = self.pair_dict[coin]["model_filename"]
1✔
579
            dk.data_path = Path(self.pair_dict[coin]["data_path"])
1✔
580

581
        if coin in self.meta_data_dictionary:
1✔
582
            dk.data = self.meta_data_dictionary[coin][METADATA]
1✔
583
            dk.feature_pipeline = self.meta_data_dictionary[coin][FEATURE_PIPELINE]
1✔
584
            dk.label_pipeline = self.meta_data_dictionary[coin][LABEL_PIPELINE]
1✔
585
        else:
586
            with (dk.data_path / f"{dk.model_filename}_{METADATA}.json").open("r") as fp:
×
587
                dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
×
588

589
            with (dk.data_path / f"{dk.model_filename}_{FEATURE_PIPELINE}.pkl").open("rb") as fp:
×
590
                dk.feature_pipeline = cloudpickle.load(fp)
×
591
            with (dk.data_path / f"{dk.model_filename}_{LABEL_PIPELINE}.pkl").open("rb") as fp:
×
592
                dk.label_pipeline = cloudpickle.load(fp)
×
593

594
        dk.training_features_list = dk.data["training_features_list"]
1✔
595
        dk.label_list = dk.data["label_list"]
1✔
596

597
        # try to access model in memory instead of loading object from disk to save time
598
        if dk.live and coin in self.model_dictionary:
1✔
599
            model = self.model_dictionary[coin]
1✔
600
        elif self.model_type == 'joblib':
×
601
            with (dk.data_path / f"{dk.model_filename}_model.joblib").open("rb") as fp:
×
602
                model = cloudpickle.load(fp)
×
603
        elif 'stable_baselines' in self.model_type or 'sb3_contrib' == self.model_type:
×
604
            mod = importlib.import_module(
×
605
                self.model_type, self.freqai_info['rl_config']['model_type'])
606
            MODELCLASS = getattr(mod, self.freqai_info['rl_config']['model_type'])
×
607
            model = MODELCLASS.load(dk.data_path / f"{dk.model_filename}_model")
×
608
        elif self.model_type == 'pytorch':
×
609
            import torch
×
610
            zip = torch.load(dk.data_path / f"{dk.model_filename}_model.zip")
×
611
            model = zip["pytrainer"]
×
612
            model = model.load_from_checkpoint(zip)
×
613

614
        if not model:
1✔
615
            raise OperationalException(
×
616
                f"Unable to load model, ensure model exists at " f"{dk.data_path} "
617
            )
618

619
        # load it into ram if it was loaded from disk
620
        if coin not in self.model_dictionary:
1✔
621
            self.model_dictionary[coin] = model
×
622

623
        return model
1✔
624

625
    def update_historic_data(self, strategy: IStrategy, dk: FreqaiDataKitchen) -> None:
1✔
626
        """
627
        Append new candles to our stores historic data (in memory) so that
628
        we do not need to load candle history from disk and we dont need to
629
        pinging exchange multiple times for the same candle.
630
        :param dataframe: DataFrame = strategy provided dataframe
631
        """
632
        feat_params = self.freqai_info["feature_parameters"]
1✔
633
        with self.history_lock:
1✔
634
            history_data = self.historic_data
1✔
635

636
            for pair in dk.all_pairs:
1✔
637
                for tf in feat_params.get("include_timeframes"):
1✔
638
                    hist_df = history_data[pair][tf]
1✔
639
                    # check if newest candle is already appended
640
                    df_dp = strategy.dp.get_pair_dataframe(pair, tf)
1✔
641
                    if len(df_dp.index) == 0:
1✔
642
                        continue
×
643
                    if str(hist_df.iloc[-1]["date"]) == str(
1✔
644
                        df_dp.iloc[-1:]["date"].iloc[-1]
645
                    ):
646
                        continue
×
647

648
                    try:
1✔
649
                        index = (
1✔
650
                            df_dp.loc[
651
                                df_dp["date"] == hist_df.iloc[-1]["date"]
652
                            ].index[0]
653
                            + 1
654
                        )
655
                    except IndexError:
×
656
                        if hist_df.iloc[-1]['date'] < df_dp['date'].iloc[0]:
×
657
                            raise OperationalException("In memory historical data is older than "
×
658
                                                       f"oldest DataProvider candle for {pair} on "
659
                                                       f"timeframe {tf}")
660
                        else:
661
                            index = -1
×
662
                            logger.warning(
×
663
                                f"No common dates in historical data and dataprovider for {pair}. "
664
                                f"Appending latest dataprovider candle to historical data "
665
                                "but please be aware that there is likely a gap in the historical "
666
                                "data. \n"
667
                                f"Historical data ends at {hist_df.iloc[-1]['date']} "
668
                                f"while dataprovider starts at {df_dp['date'].iloc[0]} and"
669
                                f"ends at {df_dp['date'].iloc[0]}."
670
                            )
671

672
                    history_data[pair][tf] = pd.concat(
1✔
673
                        [
674
                            hist_df,
675
                            df_dp.iloc[index:],
676
                        ],
677
                        ignore_index=True,
678
                        axis=0,
679
                    )
680

681
            self.current_candle = history_data[dk.pair][self.config['timeframe']].iloc[-1]['date']
1✔
682

683
    def load_all_pair_histories(self, timerange: TimeRange, dk: FreqaiDataKitchen) -> None:
1✔
684
        """
685
        Load pair histories for all whitelist and corr_pairlist pairs.
686
        Only called once upon startup of bot.
687
        :param timerange: TimeRange = full timerange required to populate all indicators
688
                          for training according to user defined train_period_days
689
        """
690
        history_data = self.historic_data
1✔
691

692
        for pair in dk.all_pairs:
1✔
693
            if pair not in history_data:
1✔
694
                history_data[pair] = {}
1✔
695
            for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
1✔
696
                history_data[pair][tf] = load_pair_history(
1✔
697
                    datadir=self.config["datadir"],
698
                    timeframe=tf,
699
                    pair=pair,
700
                    timerange=timerange,
701
                    data_format=self.config.get("dataformat_ohlcv", "feather"),
702
                    candle_type=self.config.get("candle_type_def", CandleType.SPOT),
703
                )
704

705
    def get_base_and_corr_dataframes(
1✔
706
        self, timerange: TimeRange, pair: str, dk: FreqaiDataKitchen
707
    ) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
708
        """
709
        Searches through our historic_data in memory and returns the dataframes relevant
710
        to the present pair.
711
        :param timerange: TimeRange = full timerange required to populate all indicators
712
                          for training according to user defined train_period_days
713
        :param metadata: dict = strategy furnished pair metadata
714
        """
715
        with self.history_lock:
1✔
716
            corr_dataframes: Dict[Any, Any] = {}
1✔
717
            base_dataframes: Dict[Any, Any] = {}
1✔
718
            historic_data = self.historic_data
1✔
719
            pairs = self.freqai_info["feature_parameters"].get(
1✔
720
                "include_corr_pairlist", []
721
            )
722

723
            for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
1✔
724
                base_dataframes[tf] = dk.slice_dataframe(
1✔
725
                    timerange, historic_data[pair][tf]).reset_index(drop=True)
726
                if pairs:
1✔
727
                    for p in pairs:
1✔
728
                        if pair in p:
1✔
729
                            continue  # dont repeat anything from whitelist
1✔
730
                        if p not in corr_dataframes:
1✔
731
                            corr_dataframes[p] = {}
1✔
732
                        corr_dataframes[p][tf] = dk.slice_dataframe(
1✔
733
                            timerange, historic_data[p][tf]
734
                        ).reset_index(drop=True)
735

736
        return corr_dataframes, base_dataframes
1✔
737

738
    def get_timerange_from_live_historic_predictions(self) -> TimeRange:
1✔
739
        """
740
        Returns timerange information based on historic predictions file
741
        :return: timerange calculated from saved live data
742
        """
743
        if not self.historic_predictions_path.is_file():
1✔
744
            raise OperationalException(
1✔
745
                'Historic predictions not found. Historic predictions data is required '
746
                'to run backtest with the freqai-backtest-live-models option '
747
            )
748

749
        self.load_historic_predictions_from_disk()
1✔
750

751
        all_pairs_end_dates = []
1✔
752
        for pair in self.historic_predictions:
1✔
753
            pair_historic_data = self.historic_predictions[pair]
1✔
754
            all_pairs_end_dates.append(pair_historic_data.date_pred.max())
1✔
755

756
        global_metadata = self.load_global_metadata_from_disk()
1✔
757
        start_date = datetime.fromtimestamp(int(global_metadata["start_dry_live_date"]))
1✔
758
        end_date = max(all_pairs_end_dates)
1✔
759
        # add 1 day to string timerange to ensure BT module will load all dataframe data
760
        end_date = end_date + timedelta(days=1)
1✔
761
        backtesting_timerange = TimeRange(
1✔
762
            'date', 'date', int(start_date.timestamp()), int(end_date.timestamp())
763
        )
764
        return backtesting_timerange
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc