• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 4131167254

pending completion
4131167254

push

github-actions

GitHub
Merge pull request #7983 from stash86/bt-metrics

16866 of 17748 relevant lines covered (95.03%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.01
/freqtrade/freqai/data_drawer.py
1
import collections
1✔
2
import importlib
1✔
3
import logging
1✔
4
import re
1✔
5
import shutil
1✔
6
import threading
1✔
7
from datetime import datetime, timedelta, timezone
1✔
8
from pathlib import Path
1✔
9
from typing import Any, Dict, Tuple, TypedDict
1✔
10

11
import numpy as np
1✔
12
import pandas as pd
1✔
13
import psutil
1✔
14
import rapidjson
1✔
15
from joblib import dump, load
1✔
16
from joblib.externals import cloudpickle
1✔
17
from numpy.typing import NDArray
1✔
18
from pandas import DataFrame
1✔
19

20
from freqtrade.configuration import TimeRange
1✔
21
from freqtrade.constants import Config
1✔
22
from freqtrade.data.history import load_pair_history
1✔
23
from freqtrade.exceptions import OperationalException
1✔
24
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
1✔
25
from freqtrade.strategy.interface import IStrategy
1✔
26

27

28
logger = logging.getLogger(__name__)
1✔
29

30

31
class pair_info(TypedDict):
1✔
32
    model_filename: str
1✔
33
    trained_timestamp: int
1✔
34
    data_path: str
1✔
35
    extras: dict
1✔
36

37

38
class FreqaiDataDrawer:
1✔
39
    """
40
    Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving
41
    /loading to/from disk.
42
    This object remains persistent throughout live/dry.
43

44
    Record of contribution:
45
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
46
    project.
47

48
    Conception and software development:
49
    Robert Caulk @robcaulk
50

51
    Theoretical brainstorming:
52
    Elin Törnquist @th0rntwig
53

54
    Code review, software architecture brainstorming:
55
    @xmatthias
56

57
    Beta testing and bug reporting:
58
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
59
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
60
    """
61

62
    def __init__(self, full_path: Path, config: Config, follow_mode: bool = False):
1✔
63

64
        self.config = config
1✔
65
        self.freqai_info = config.get("freqai", {})
1✔
66
        # dictionary holding all pair metadata necessary to load in from disk
67
        self.pair_dict: Dict[str, pair_info] = {}
1✔
68
        # dictionary holding all actively inferenced models in memory given a model filename
69
        self.model_dictionary: Dict[str, Any] = {}
1✔
70
        # all additional metadata that we want to keep in ram
71
        self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {}
1✔
72
        self.model_return_values: Dict[str, DataFrame] = {}
1✔
73
        self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
1✔
74
        self.historic_predictions: Dict[str, DataFrame] = {}
1✔
75
        self.follower_dict: Dict[str, pair_info] = {}
1✔
76
        self.full_path = full_path
1✔
77
        self.follower_name: str = self.config.get("bot_name", "follower1")
1✔
78
        self.follower_dict_path = Path(
1✔
79
            self.full_path / f"follower_dictionary-{self.follower_name}.json"
80
        )
81
        self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
1✔
82
        self.historic_predictions_bkp_path = Path(
1✔
83
            self.full_path / "historic_predictions.backup.pkl")
84
        self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
1✔
85
        self.global_metadata_path = Path(self.full_path / "global_metadata.json")
1✔
86
        self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
1✔
87
        self.follow_mode = follow_mode
1✔
88
        if follow_mode:
1✔
89
            self.create_follower_dict()
1✔
90
        self.load_drawer_from_disk()
1✔
91
        self.load_historic_predictions_from_disk()
1✔
92
        self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
1✔
93
        self.load_metric_tracker_from_disk()
1✔
94
        self.training_queue: Dict[str, int] = {}
1✔
95
        self.history_lock = threading.Lock()
1✔
96
        self.save_lock = threading.Lock()
1✔
97
        self.pair_dict_lock = threading.Lock()
1✔
98
        self.metric_tracker_lock = threading.Lock()
1✔
99
        self.old_DBSCAN_eps: Dict[str, float] = {}
1✔
100
        self.empty_pair_dict: pair_info = {
1✔
101
                "model_filename": "", "trained_timestamp": 0,
102
                "data_path": "", "extras": {}}
103
        self.model_type = self.freqai_info.get('model_save_type', 'joblib')
1✔
104

105
    def update_metric_tracker(self, metric: str, value: float, pair: str) -> None:
1✔
106
        """
107
        General utility for adding and updating custom metrics. Typically used
108
        for adding training performance, train timings, inferenc timings, cpu loads etc.
109
        """
110
        with self.metric_tracker_lock:
×
111
            if pair not in self.metric_tracker:
×
112
                self.metric_tracker[pair] = {}
×
113
            if metric not in self.metric_tracker[pair]:
×
114
                self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []}
×
115

116
            timestamp = int(datetime.now(timezone.utc).timestamp())
×
117
            self.metric_tracker[pair][metric]['value'].append(value)
×
118
            self.metric_tracker[pair][metric]['timestamp'].append(timestamp)
×
119

120
    def collect_metrics(self, time_spent: float, pair: str):
1✔
121
        """
122
        Add metrics to the metric tracker dictionary
123
        """
124
        load1, load5, load15 = psutil.getloadavg()
×
125
        cpus = psutil.cpu_count()
×
126
        self.update_metric_tracker('train_time', time_spent, pair)
×
127
        self.update_metric_tracker('cpu_load1min', load1 / cpus, pair)
×
128
        self.update_metric_tracker('cpu_load5min', load5 / cpus, pair)
×
129
        self.update_metric_tracker('cpu_load15min', load15 / cpus, pair)
×
130

131
    def load_global_metadata_from_disk(self):
1✔
132
        """
133
        Locate and load a previously saved global metadata in present model folder.
134
        """
135
        exists = self.global_metadata_path.is_file()
1✔
136
        if exists:
1✔
137
            with open(self.global_metadata_path, "r") as fp:
1✔
138
                metatada_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
139
                return metatada_dict
1✔
140
        return {}
1✔
141

142
    def load_drawer_from_disk(self):
1✔
143
        """
144
        Locate and load a previously saved data drawer full of all pair model metadata in
145
        present model folder.
146
        Load any existing metric tracker that may be present.
147
        """
148
        exists = self.pair_dictionary_path.is_file()
1✔
149
        if exists:
1✔
150
            with open(self.pair_dictionary_path, "r") as fp:
1✔
151
                self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
152
        elif not self.follow_mode:
1✔
153
            logger.info("Could not find existing datadrawer, starting from scratch")
1✔
154
        else:
155
            logger.warning(
×
156
                f"Follower could not find pair_dictionary at {self.full_path} "
157
                "sending null values back to strategy"
158
            )
159

160
    def load_metric_tracker_from_disk(self):
1✔
161
        """
162
        Tries to load an existing metrics dictionary if the user
163
        wants to collect metrics.
164
        """
165
        if self.freqai_info.get('write_metrics_to_disk', False):
1✔
166
            exists = self.metric_tracker_path.is_file()
×
167
            if exists:
×
168
                with open(self.metric_tracker_path, "r") as fp:
×
169
                    self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
×
170
                logger.info("Loading existing metric tracker from disk.")
×
171
            else:
172
                logger.info("Could not find existing metric tracker, starting from scratch")
×
173

174
    def load_historic_predictions_from_disk(self):
1✔
175
        """
176
        Locate and load a previously saved historic predictions.
177
        :return: bool - whether or not the drawer was located
178
        """
179
        exists = self.historic_predictions_path.is_file()
1✔
180
        if exists:
1✔
181
            try:
1✔
182
                with open(self.historic_predictions_path, "rb") as fp:
1✔
183
                    self.historic_predictions = cloudpickle.load(fp)
1✔
184
                logger.info(
1✔
185
                    f"Found existing historic predictions at {self.full_path}, but beware "
186
                    "that statistics may be inaccurate if the bot has been offline for "
187
                    "an extended period of time."
188
                )
189
            except EOFError:
×
190
                logger.warning(
×
191
                    'Historical prediction file was corrupted. Trying to load backup file.')
192
                with open(self.historic_predictions_bkp_path, "rb") as fp:
×
193
                    self.historic_predictions = cloudpickle.load(fp)
×
194
                logger.warning('FreqAI successfully loaded the backup historical predictions file.')
×
195

196
        elif not self.follow_mode:
1✔
197
            logger.info("Could not find existing historic_predictions, starting from scratch")
1✔
198
        else:
199
            logger.warning(
1✔
200
                f"Follower could not find historic predictions at {self.full_path} "
201
                "sending null values back to strategy"
202
            )
203

204
        return exists
1✔
205

206
    def save_historic_predictions_to_disk(self):
1✔
207
        """
208
        Save historic predictions pickle to disk
209
        """
210
        with open(self.historic_predictions_path, "wb") as fp:
1✔
211
            cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
1✔
212

213
        # create a backup
214
        shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
1✔
215

216
    def save_metric_tracker_to_disk(self):
1✔
217
        """
218
        Save metric tracker of all pair metrics collected.
219
        """
220
        with self.save_lock:
1✔
221
            with open(self.metric_tracker_path, 'w') as fp:
1✔
222
                rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder,
1✔
223
                               number_mode=rapidjson.NM_NATIVE)
224

225
    def save_drawer_to_disk(self):
1✔
226
        """
227
        Save data drawer full of all pair model metadata in present model folder.
228
        """
229
        with self.save_lock:
1✔
230
            with open(self.pair_dictionary_path, 'w') as fp:
1✔
231
                rapidjson.dump(self.pair_dict, fp, default=self.np_encoder,
1✔
232
                               number_mode=rapidjson.NM_NATIVE)
233

234
    def save_follower_dict_to_disk(self):
1✔
235
        """
236
        Save follower dictionary to disk (used by strategy for persistent prediction targets)
237
        """
238
        with open(self.follower_dict_path, "w") as fp:
1✔
239
            rapidjson.dump(self.follower_dict, fp, default=self.np_encoder,
1✔
240
                           number_mode=rapidjson.NM_NATIVE)
241

242
    def save_global_metadata_to_disk(self, metadata: Dict[str, Any]):
1✔
243
        """
244
        Save global metadata json to disk
245
        """
246
        with self.save_lock:
1✔
247
            with open(self.global_metadata_path, 'w') as fp:
1✔
248
                rapidjson.dump(metadata, fp, default=self.np_encoder,
1✔
249
                               number_mode=rapidjson.NM_NATIVE)
250

251
    def create_follower_dict(self):
1✔
252
        """
253
        Create or dictionary for each follower to maintain unique persistent prediction targets
254
        """
255

256
        whitelist_pairs = self.config.get("exchange", {}).get("pair_whitelist")
1✔
257

258
        exists = self.follower_dict_path.is_file()
1✔
259

260
        if exists:
1✔
261
            logger.info("Found an existing follower dictionary")
×
262

263
        for pair in whitelist_pairs:
1✔
264
            self.follower_dict[pair] = {}
1✔
265

266
        self.save_follower_dict_to_disk()
1✔
267

268
    def np_encoder(self, object):
1✔
269
        if isinstance(object, np.generic):
1✔
270
            return object.item()
1✔
271

272
    def get_pair_dict_info(self, pair: str) -> Tuple[str, int, bool]:
1✔
273
        """
274
        Locate and load existing model metadata from persistent storage. If not located,
275
        create a new one and append the current pair to it and prepare it for its first
276
        training
277
        :param pair: str: pair to lookup
278
        :return:
279
            model_filename: str = unique filename used for loading persistent objects from disk
280
            trained_timestamp: int = the last time the coin was trained
281
            return_null_array: bool = Follower could not find pair metadata
282
        """
283

284
        pair_dict = self.pair_dict.get(pair)
1✔
285
        data_path_set = self.pair_dict.get(pair, self.empty_pair_dict).get("data_path", "")
1✔
286
        return_null_array = False
1✔
287

288
        if pair_dict:
1✔
289
            model_filename = pair_dict["model_filename"]
1✔
290
            trained_timestamp = pair_dict["trained_timestamp"]
1✔
291
        elif not self.follow_mode:
1✔
292
            self.pair_dict[pair] = self.empty_pair_dict.copy()
1✔
293
            model_filename = ""
1✔
294
            trained_timestamp = 0
1✔
295

296
        if not data_path_set and self.follow_mode:
1✔
297
            logger.warning(
×
298
                f"Follower could not find current pair {pair} in "
299
                f"pair_dictionary at path {self.full_path}, sending null values "
300
                "back to strategy."
301
            )
302
            trained_timestamp = 0
×
303
            model_filename = ''
×
304
            return_null_array = True
×
305

306
        return model_filename, trained_timestamp, return_null_array
1✔
307

308
    def set_pair_dict_info(self, metadata: dict) -> None:
1✔
309
        pair_in_dict = self.pair_dict.get(metadata["pair"])
1✔
310
        if pair_in_dict:
1✔
311
            return
×
312
        else:
313
            self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy()
1✔
314

315
            return
1✔
316

317
    def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None:
1✔
318
        """
319
        Set the initial return values to the historical predictions dataframe. This avoids needing
320
        to repredict on historical candles, and also stores historical predictions despite
321
        retrainings (so stored predictions are true predictions, not just inferencing on trained
322
        data)
323
        """
324

325
        hist_df = self.historic_predictions
1✔
326
        len_diff = len(hist_df[pair].index) - len(pred_df.index)
1✔
327
        if len_diff < 0:
1✔
328
            df_concat = pd.concat([pred_df.iloc[:abs(len_diff)], hist_df[pair]],
×
329
                                  ignore_index=True, keys=hist_df[pair].keys())
330
        else:
331
            df_concat = hist_df[pair].tail(len(pred_df.index)).reset_index(drop=True)
1✔
332
        df_concat = df_concat.fillna(0)
1✔
333
        self.model_return_values[pair] = df_concat
1✔
334

335
    def append_model_predictions(self, pair: str, predictions: DataFrame,
1✔
336
                                 do_preds: NDArray[np.int_],
337
                                 dk: FreqaiDataKitchen, strat_df: DataFrame) -> None:
338
        """
339
        Append model predictions to historic predictions dataframe, then set the
340
        strategy return dataframe to the tail of the historic predictions. The length of
341
        the tail is equivalent to the length of the dataframe that entered FreqAI from
342
        the strategy originally. Doing this allows FreqUI to always display the correct
343
        historic predictions.
344
        """
345

346
        len_df = len(strat_df)
×
347
        index = self.historic_predictions[pair].index[-1:]
×
348
        columns = self.historic_predictions[pair].columns
×
349

350
        nan_df = pd.DataFrame(np.nan, index=index, columns=columns)
×
351
        self.historic_predictions[pair] = pd.concat(
×
352
            [self.historic_predictions[pair], nan_df], ignore_index=True, axis=0)
353
        df = self.historic_predictions[pair]
×
354

355
        # model outputs and associated statistics
356
        for label in predictions.columns:
×
357
            df[label].iloc[-1] = predictions[label].iloc[-1]
×
358
            if df[label].dtype == object:
×
359
                continue
×
360
            df[f"{label}_mean"].iloc[-1] = dk.data["labels_mean"][label]
×
361
            df[f"{label}_std"].iloc[-1] = dk.data["labels_std"][label]
×
362

363
        # outlier indicators
364
        df["do_predict"].iloc[-1] = do_preds[-1]
×
365
        if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
×
366
            df["DI_values"].iloc[-1] = dk.DI_values[-1]
×
367

368
        # extra values the user added within custom prediction model
369
        if dk.data['extra_returns_per_train']:
×
370
            rets = dk.data['extra_returns_per_train']
×
371
            for return_str in rets:
×
372
                df[return_str].iloc[-1] = rets[return_str]
×
373

374
        # this logic carries users between version without needing to
375
        # change their identifier
376
        if 'close_price' not in df.columns:
×
377
            df['close_price'] = np.nan
×
378
            df['date_pred'] = np.nan
×
379

380
        df['close_price'].iloc[-1] = strat_df['close'].iloc[-1]
×
381
        df['date_pred'].iloc[-1] = strat_df['date'].iloc[-1]
×
382

383
        self.model_return_values[pair] = df.tail(len_df).reset_index(drop=True)
×
384

385
    def attach_return_values_to_return_dataframe(
1✔
386
            self, pair: str, dataframe: DataFrame) -> DataFrame:
387
        """
388
        Attach the return values to the strat dataframe
389
        :param dataframe: DataFrame = strategy dataframe
390
        :return: DataFrame = strat dataframe with return values attached
391
        """
392
        df = self.model_return_values[pair]
1✔
393
        to_keep = [col for col in dataframe.columns if not col.startswith("&")]
1✔
394
        dataframe = pd.concat([dataframe[to_keep], df], axis=1)
1✔
395
        return dataframe
1✔
396

397
    def return_null_values_to_strategy(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> None:
1✔
398
        """
399
        Build 0 filled dataframe to return to strategy
400
        """
401

402
        dk.find_features(dataframe)
×
403
        dk.find_labels(dataframe)
×
404

405
        full_labels = dk.label_list + dk.unique_class_list
×
406

407
        for label in full_labels:
×
408
            dataframe[label] = 0
×
409
            dataframe[f"{label}_mean"] = 0
×
410
            dataframe[f"{label}_std"] = 0
×
411

412
        dataframe["do_predict"] = 0
×
413

414
        if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
×
415
            dataframe["DI_values"] = 0
×
416

417
        if dk.data['extra_returns_per_train']:
×
418
            rets = dk.data['extra_returns_per_train']
×
419
            for return_str in rets:
×
420
                dataframe[return_str] = 0
×
421

422
        dk.return_dataframe = dataframe
×
423

424
    def purge_old_models(self) -> None:
1✔
425

426
        model_folders = [x for x in self.full_path.iterdir() if x.is_dir()]
1✔
427

428
        pattern = re.compile(r"sub-train-(\w+)_(\d{10})")
1✔
429

430
        delete_dict: Dict[str, Any] = {}
1✔
431

432
        for dir in model_folders:
1✔
433
            result = pattern.match(str(dir.name))
1✔
434
            if result is None:
1✔
435
                continue
1✔
436
            coin = result.group(1)
1✔
437
            timestamp = result.group(2)
1✔
438

439
            if coin not in delete_dict:
1✔
440
                delete_dict[coin] = {}
1✔
441
                delete_dict[coin]["num_folders"] = 1
1✔
442
                delete_dict[coin]["timestamps"] = {int(timestamp): dir}
1✔
443
            else:
444
                delete_dict[coin]["num_folders"] += 1
×
445
                delete_dict[coin]["timestamps"][int(timestamp)] = dir
×
446

447
        for coin in delete_dict:
1✔
448
            if delete_dict[coin]["num_folders"] > 2:
1✔
449
                sorted_dict = collections.OrderedDict(
×
450
                    sorted(delete_dict[coin]["timestamps"].items())
451
                )
452
                num_delete = len(sorted_dict) - 2
×
453
                deleted = 0
×
454
                for k, v in sorted_dict.items():
×
455
                    if deleted >= num_delete:
×
456
                        break
×
457
                    logger.info(f"Freqai purging old model file {v}")
×
458
                    shutil.rmtree(v)
×
459
                    deleted += 1
×
460

461
    def update_follower_metadata(self):
1✔
462
        # follower needs to load from disk to get any changes made by leader to pair_dict
463
        self.load_drawer_from_disk()
1✔
464
        if self.config.get("freqai", {}).get("purge_old_models", False):
1✔
465
            self.purge_old_models()
1✔
466

467
    def save_metadata(self, dk: FreqaiDataKitchen) -> None:
1✔
468
        """
469
        Saves only metadata for backtesting studies if user prefers
470
        not to save model data. This saves tremendous amounts of space
471
        for users generating huge studies.
472
        This is only active when `save_backtest_models`: false (not default)
473
        """
474
        if not dk.data_path.is_dir():
×
475
            dk.data_path.mkdir(parents=True, exist_ok=True)
×
476

477
        save_path = Path(dk.data_path)
×
478

479
        dk.data["data_path"] = str(dk.data_path)
×
480
        dk.data["model_filename"] = str(dk.model_filename)
×
481
        dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
×
482
        dk.data["label_list"] = dk.label_list
×
483

484
        with open(save_path / f"{dk.model_filename}_metadata.json", "w") as fp:
×
485
            rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
×
486

487
        return
×
488

489
    def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
1✔
490
        """
491
        Saves all data associated with a model for a single sub-train time range
492
        :param model: User trained model which can be reused for inferencing to generate
493
                      predictions
494
        """
495

496
        if not dk.data_path.is_dir():
1✔
497
            dk.data_path.mkdir(parents=True, exist_ok=True)
1✔
498

499
        save_path = Path(dk.data_path)
1✔
500

501
        # Save the trained model
502
        if self.model_type == 'joblib':
1✔
503
            dump(model, save_path / f"{dk.model_filename}_model.joblib")
1✔
504
        elif self.model_type == 'keras':
1✔
505
            model.save(save_path / f"{dk.model_filename}_model.h5")
×
506
        elif 'stable_baselines' in self.model_type or 'sb3_contrib' == self.model_type:
1✔
507
            model.save(save_path / f"{dk.model_filename}_model.zip")
1✔
508

509
        if dk.svm_model is not None:
1✔
510
            dump(dk.svm_model, save_path / f"{dk.model_filename}_svm_model.joblib")
1✔
511

512
        dk.data["data_path"] = str(dk.data_path)
1✔
513
        dk.data["model_filename"] = str(dk.model_filename)
1✔
514
        dk.data["training_features_list"] = dk.training_features_list
1✔
515
        dk.data["label_list"] = dk.label_list
1✔
516
        # store the metadata
517
        with open(save_path / f"{dk.model_filename}_metadata.json", "w") as fp:
1✔
518
            rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
1✔
519

520
        # save the train data to file so we can check preds for area of applicability later
521
        dk.data_dictionary["train_features"].to_pickle(
1✔
522
            save_path / f"{dk.model_filename}_trained_df.pkl"
523
        )
524

525
        dk.data_dictionary["train_dates"].to_pickle(
1✔
526
            save_path / f"{dk.model_filename}_trained_dates_df.pkl"
527
        )
528

529
        if self.freqai_info["feature_parameters"].get("principal_component_analysis"):
1✔
530
            cloudpickle.dump(
1✔
531
                dk.pca, open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "wb")
532
            )
533

534
        self.model_dictionary[coin] = model
1✔
535
        self.pair_dict[coin]["model_filename"] = dk.model_filename
1✔
536
        self.pair_dict[coin]["data_path"] = str(dk.data_path)
1✔
537

538
        if coin not in self.meta_data_dictionary:
1✔
539
            self.meta_data_dictionary[coin] = {}
1✔
540
        self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"]
1✔
541
        self.meta_data_dictionary[coin]["meta_data"] = dk.data
1✔
542
        self.save_drawer_to_disk()
1✔
543

544
        return
1✔
545

546
    def load_metadata(self, dk: FreqaiDataKitchen) -> None:
1✔
547
        """
548
        Load only metadata into datakitchen to increase performance during
549
        presaved backtesting (prediction file loading).
550
        """
551
        with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
1✔
552
            dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
553
            dk.training_features_list = dk.data["training_features_list"]
1✔
554
            dk.label_list = dk.data["label_list"]
1✔
555

556
    def load_data(self, coin: str, dk: FreqaiDataKitchen) -> Any:
1✔
557
        """
558
        loads all data required to make a prediction on a sub-train time range
559
        :returns:
560
        :model: User trained model which can be inferenced for new predictions
561
        """
562

563
        if not self.pair_dict[coin]["model_filename"]:
1✔
564
            return None
×
565

566
        if dk.live:
1✔
567
            dk.model_filename = self.pair_dict[coin]["model_filename"]
1✔
568
            dk.data_path = Path(self.pair_dict[coin]["data_path"])
1✔
569

570
        if coin in self.meta_data_dictionary:
1✔
571
            dk.data = self.meta_data_dictionary[coin]["meta_data"]
1✔
572
            dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"]
1✔
573
        else:
574
            with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
1✔
575
                dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
576

577
            dk.data_dictionary["train_features"] = pd.read_pickle(
1✔
578
                dk.data_path / f"{dk.model_filename}_trained_df.pkl"
579
            )
580

581
        dk.training_features_list = dk.data["training_features_list"]
1✔
582
        dk.label_list = dk.data["label_list"]
1✔
583

584
        # try to access model in memory instead of loading object from disk to save time
585
        if dk.live and coin in self.model_dictionary:
1✔
586
            model = self.model_dictionary[coin]
×
587
        elif self.model_type == 'joblib':
1✔
588
            model = load(dk.data_path / f"{dk.model_filename}_model.joblib")
1✔
589
        elif self.model_type == 'keras':
×
590
            from tensorflow import keras
×
591
            model = keras.models.load_model(dk.data_path / f"{dk.model_filename}_model.h5")
×
592
        elif 'stable_baselines' in self.model_type or 'sb3_contrib' == self.model_type:
×
593
            mod = importlib.import_module(
×
594
                self.model_type, self.freqai_info['rl_config']['model_type'])
595
            MODELCLASS = getattr(mod, self.freqai_info['rl_config']['model_type'])
×
596
            model = MODELCLASS.load(dk.data_path / f"{dk.model_filename}_model")
×
597

598
        if Path(dk.data_path / f"{dk.model_filename}_svm_model.joblib").is_file():
1✔
599
            dk.svm_model = load(dk.data_path / f"{dk.model_filename}_svm_model.joblib")
1✔
600

601
        if not model:
1✔
602
            raise OperationalException(
×
603
                f"Unable to load model, ensure model exists at " f"{dk.data_path} "
604
            )
605

606
        # load it into ram if it was loaded from disk
607
        if coin not in self.model_dictionary:
1✔
608
            self.model_dictionary[coin] = model
1✔
609

610
        if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
1✔
611
            dk.pca = cloudpickle.load(
×
612
                open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "rb")
613
            )
614

615
        return model
1✔
616

617
    def update_historic_data(self, strategy: IStrategy, dk: FreqaiDataKitchen) -> None:
1✔
618
        """
619
        Append new candles to our stores historic data (in memory) so that
620
        we do not need to load candle history from disk and we dont need to
621
        pinging exchange multiple times for the same candle.
622
        :param dataframe: DataFrame = strategy provided dataframe
623
        """
624
        feat_params = self.freqai_info["feature_parameters"]
1✔
625
        with self.history_lock:
1✔
626
            history_data = self.historic_data
1✔
627

628
            for pair in dk.all_pairs:
1✔
629
                for tf in feat_params.get("include_timeframes"):
1✔
630

631
                    # check if newest candle is already appended
632
                    df_dp = strategy.dp.get_pair_dataframe(pair, tf)
1✔
633
                    if len(df_dp.index) == 0:
1✔
634
                        continue
×
635
                    if str(history_data[pair][tf].iloc[-1]["date"]) == str(
1✔
636
                        df_dp.iloc[-1:]["date"].iloc[-1]
637
                    ):
638
                        continue
1✔
639

640
                    try:
1✔
641
                        index = (
1✔
642
                            df_dp.loc[
643
                                df_dp["date"] == history_data[pair][tf].iloc[-1]["date"]
644
                            ].index[0]
645
                            + 1
646
                        )
647
                    except IndexError:
×
648
                        logger.warning(
×
649
                            f"Unable to update pair history for {pair}. "
650
                            "If this does not resolve itself after 1 additional candle, "
651
                            "please report the error to #freqai discord channel"
652
                        )
653
                        return
×
654

655
                    history_data[pair][tf] = pd.concat(
1✔
656
                        [
657
                            history_data[pair][tf],
658
                            df_dp.iloc[index:],
659
                        ],
660
                        ignore_index=True,
661
                        axis=0,
662
                    )
663

664
            self.current_candle = history_data[dk.pair][self.config['timeframe']].iloc[-1]['date']
1✔
665

666
    def load_all_pair_histories(self, timerange: TimeRange, dk: FreqaiDataKitchen) -> None:
1✔
667
        """
668
        Load pair histories for all whitelist and corr_pairlist pairs.
669
        Only called once upon startup of bot.
670
        :param timerange: TimeRange = full timerange required to populate all indicators
671
                          for training according to user defined train_period_days
672
        """
673
        history_data = self.historic_data
1✔
674

675
        for pair in dk.all_pairs:
1✔
676
            if pair not in history_data:
1✔
677
                history_data[pair] = {}
1✔
678
            for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
1✔
679
                history_data[pair][tf] = load_pair_history(
1✔
680
                    datadir=self.config["datadir"],
681
                    timeframe=tf,
682
                    pair=pair,
683
                    timerange=timerange,
684
                    data_format=self.config.get("dataformat_ohlcv", "json"),
685
                    candle_type=self.config.get("trading_mode", "spot"),
686
                )
687

688
    def get_base_and_corr_dataframes(
1✔
689
        self, timerange: TimeRange, pair: str, dk: FreqaiDataKitchen
690
    ) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
691
        """
692
        Searches through our historic_data in memory and returns the dataframes relevant
693
        to the present pair.
694
        :param timerange: TimeRange = full timerange required to populate all indicators
695
                          for training according to user defined train_period_days
696
        :param metadata: dict = strategy furnished pair metadata
697
        """
698
        with self.history_lock:
1✔
699
            corr_dataframes: Dict[Any, Any] = {}
1✔
700
            base_dataframes: Dict[Any, Any] = {}
1✔
701
            historic_data = self.historic_data
1✔
702
            pairs = self.freqai_info["feature_parameters"].get(
1✔
703
                "include_corr_pairlist", []
704
            )
705

706
            for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
1✔
707
                base_dataframes[tf] = dk.slice_dataframe(
1✔
708
                    timerange, historic_data[pair][tf]).reset_index(drop=True)
709
                if pairs:
1✔
710
                    for p in pairs:
1✔
711
                        if pair in p:
1✔
712
                            continue  # dont repeat anything from whitelist
1✔
713
                        if p not in corr_dataframes:
1✔
714
                            corr_dataframes[p] = {}
1✔
715
                        corr_dataframes[p][tf] = dk.slice_dataframe(
1✔
716
                            timerange, historic_data[p][tf]
717
                        ).reset_index(drop=True)
718

719
        return corr_dataframes, base_dataframes
1✔
720

721
    def get_timerange_from_live_historic_predictions(self) -> TimeRange:
1✔
722
        """
723
        Returns timerange information based on historic predictions file
724
        :return: timerange calculated from saved live data
725
        """
726
        if not self.historic_predictions_path.is_file():
1✔
727
            raise OperationalException(
1✔
728
                'Historic predictions not found. Historic predictions data is required '
729
                'to run backtest with the freqai-backtest-live-models option '
730
            )
731

732
        self.load_historic_predictions_from_disk()
1✔
733

734
        all_pairs_end_dates = []
1✔
735
        for pair in self.historic_predictions:
1✔
736
            pair_historic_data = self.historic_predictions[pair]
1✔
737
            all_pairs_end_dates.append(pair_historic_data.date_pred.max())
1✔
738

739
        global_metadata = self.load_global_metadata_from_disk()
1✔
740
        start_date = datetime.fromtimestamp(int(global_metadata["start_dry_live_date"]))
1✔
741
        end_date = max(all_pairs_end_dates)
1✔
742
        # add 1 day to string timerange to ensure BT module will load all dataframe data
743
        end_date = end_date + timedelta(days=1)
1✔
744
        backtesting_timerange = TimeRange(
1✔
745
            'date', 'date', int(start_date.timestamp()), int(end_date.timestamp())
746
        )
747
        return backtesting_timerange
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc