• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 6181253459

08 Sep 2023 06:04AM UTC coverage: 94.614% (+0.06%) from 94.556%
6181253459

push

github-actions

web-flow
Merge pull request #9159 from stash86/fix-adjust

remove old codes when we only can do partial entries

2 of 2 new or added lines in 1 file covered. (100.0%)

19114 of 20202 relevant lines covered (94.61%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.89
/freqtrade/freqai/data_drawer.py
1
import collections
1✔
2
import importlib
1✔
3
import logging
1✔
4
import re
1✔
5
import shutil
1✔
6
import threading
1✔
7
from datetime import datetime, timedelta, timezone
1✔
8
from pathlib import Path
1✔
9
from typing import Any, Dict, Tuple, TypedDict
1✔
10

11
import numpy as np
1✔
12
import pandas as pd
1✔
13
import psutil
1✔
14
import rapidjson
1✔
15
from joblib import dump, load
1✔
16
from joblib.externals import cloudpickle
1✔
17
from numpy.typing import NDArray
1✔
18
from pandas import DataFrame
1✔
19

20
from freqtrade.configuration import TimeRange
1✔
21
from freqtrade.constants import Config
1✔
22
from freqtrade.data.history import load_pair_history
1✔
23
from freqtrade.enums import CandleType
1✔
24
from freqtrade.exceptions import OperationalException
1✔
25
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
1✔
26
from freqtrade.strategy.interface import IStrategy
1✔
27

28

29
logger = logging.getLogger(__name__)
1✔
30

31
FEATURE_PIPELINE = "feature_pipeline"
1✔
32
LABEL_PIPELINE = "label_pipeline"
1✔
33
TRAINDF = "trained_df"
1✔
34
METADATA = "metadata"
1✔
35

36

37
class pair_info(TypedDict):
1✔
38
    model_filename: str
1✔
39
    trained_timestamp: int
1✔
40
    data_path: str
1✔
41
    extras: dict
1✔
42

43

44
class FreqaiDataDrawer:
1✔
45
    """
46
    Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving
47
    /loading to/from disk.
48
    This object remains persistent throughout live/dry.
49

50
    Record of contribution:
51
    FreqAI was developed by a group of individuals who all contributed specific skillsets to the
52
    project.
53

54
    Conception and software development:
55
    Robert Caulk @robcaulk
56

57
    Theoretical brainstorming:
58
    Elin Törnquist @th0rntwig
59

60
    Code review, software architecture brainstorming:
61
    @xmatthias
62

63
    Beta testing and bug reporting:
64
    @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
65
    Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
66
    """
67

68
    def __init__(self, full_path: Path, config: Config):
1✔
69

70
        self.config = config
1✔
71
        self.freqai_info = config.get("freqai", {})
1✔
72
        # dictionary holding all pair metadata necessary to load in from disk
73
        self.pair_dict: Dict[str, pair_info] = {}
1✔
74
        # dictionary holding all actively inferenced models in memory given a model filename
75
        self.model_dictionary: Dict[str, Any] = {}
1✔
76
        # all additional metadata that we want to keep in ram
77
        self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {}
1✔
78
        self.model_return_values: Dict[str, DataFrame] = {}
1✔
79
        self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
1✔
80
        self.historic_predictions: Dict[str, DataFrame] = {}
1✔
81
        self.full_path = full_path
1✔
82
        self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
1✔
83
        self.historic_predictions_bkp_path = Path(
1✔
84
            self.full_path / "historic_predictions.backup.pkl")
85
        self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
1✔
86
        self.global_metadata_path = Path(self.full_path / "global_metadata.json")
1✔
87
        self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
1✔
88
        self.load_drawer_from_disk()
1✔
89
        self.load_historic_predictions_from_disk()
1✔
90
        self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
1✔
91
        self.load_metric_tracker_from_disk()
1✔
92
        self.training_queue: Dict[str, int] = {}
1✔
93
        self.history_lock = threading.Lock()
1✔
94
        self.save_lock = threading.Lock()
1✔
95
        self.pair_dict_lock = threading.Lock()
1✔
96
        self.metric_tracker_lock = threading.Lock()
1✔
97
        self.old_DBSCAN_eps: Dict[str, float] = {}
1✔
98
        self.empty_pair_dict: pair_info = {
1✔
99
                "model_filename": "", "trained_timestamp": 0,
100
                "data_path": "", "extras": {}}
101
        self.model_type = self.freqai_info.get('model_save_type', 'joblib')
1✔
102

103
    def update_metric_tracker(self, metric: str, value: float, pair: str) -> None:
1✔
104
        """
105
        General utility for adding and updating custom metrics. Typically used
106
        for adding training performance, train timings, inferenc timings, cpu loads etc.
107
        """
108
        with self.metric_tracker_lock:
×
109
            if pair not in self.metric_tracker:
×
110
                self.metric_tracker[pair] = {}
×
111
            if metric not in self.metric_tracker[pair]:
×
112
                self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []}
×
113

114
            timestamp = int(datetime.now(timezone.utc).timestamp())
×
115
            self.metric_tracker[pair][metric]['value'].append(value)
×
116
            self.metric_tracker[pair][metric]['timestamp'].append(timestamp)
×
117

118
    def collect_metrics(self, time_spent: float, pair: str):
1✔
119
        """
120
        Add metrics to the metric tracker dictionary
121
        """
122
        load1, load5, load15 = psutil.getloadavg()
×
123
        cpus = psutil.cpu_count()
×
124
        self.update_metric_tracker('train_time', time_spent, pair)
×
125
        self.update_metric_tracker('cpu_load1min', load1 / cpus, pair)
×
126
        self.update_metric_tracker('cpu_load5min', load5 / cpus, pair)
×
127
        self.update_metric_tracker('cpu_load15min', load15 / cpus, pair)
×
128

129
    def load_global_metadata_from_disk(self):
1✔
130
        """
131
        Locate and load a previously saved global metadata in present model folder.
132
        """
133
        exists = self.global_metadata_path.is_file()
1✔
134
        if exists:
1✔
135
            with self.global_metadata_path.open("r") as fp:
1✔
136
                metatada_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
137
                return metatada_dict
1✔
138
        return {}
1✔
139

140
    def load_drawer_from_disk(self):
1✔
141
        """
142
        Locate and load a previously saved data drawer full of all pair model metadata in
143
        present model folder.
144
        Load any existing metric tracker that may be present.
145
        """
146
        exists = self.pair_dictionary_path.is_file()
1✔
147
        if exists:
1✔
148
            with self.pair_dictionary_path.open("r") as fp:
1✔
149
                self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
150
        else:
151
            logger.info("Could not find existing datadrawer, starting from scratch")
1✔
152

153
    def load_metric_tracker_from_disk(self):
1✔
154
        """
155
        Tries to load an existing metrics dictionary if the user
156
        wants to collect metrics.
157
        """
158
        if self.freqai_info.get('write_metrics_to_disk', False):
1✔
159
            exists = self.metric_tracker_path.is_file()
×
160
            if exists:
×
161
                with self.metric_tracker_path.open("r") as fp:
×
162
                    self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
×
163
                logger.info("Loading existing metric tracker from disk.")
×
164
            else:
165
                logger.info("Could not find existing metric tracker, starting from scratch")
×
166

167
    def load_historic_predictions_from_disk(self):
1✔
168
        """
169
        Locate and load a previously saved historic predictions.
170
        :return: bool - whether or not the drawer was located
171
        """
172
        exists = self.historic_predictions_path.is_file()
1✔
173
        if exists:
1✔
174
            try:
1✔
175
                with self.historic_predictions_path.open("rb") as fp:
1✔
176
                    self.historic_predictions = cloudpickle.load(fp)
1✔
177
                logger.info(
1✔
178
                    f"Found existing historic predictions at {self.full_path}, but beware "
179
                    "that statistics may be inaccurate if the bot has been offline for "
180
                    "an extended period of time."
181
                )
182
            except EOFError:
×
183
                logger.warning(
×
184
                    'Historical prediction file was corrupted. Trying to load backup file.')
185
                with self.historic_predictions_bkp_path.open("rb") as fp:
×
186
                    self.historic_predictions = cloudpickle.load(fp)
×
187
                logger.warning('FreqAI successfully loaded the backup historical predictions file.')
×
188

189
        else:
190
            logger.info("Could not find existing historic_predictions, starting from scratch")
1✔
191

192
        return exists
1✔
193

194
    def save_historic_predictions_to_disk(self):
1✔
195
        """
196
        Save historic predictions pickle to disk
197
        """
198
        with self.historic_predictions_path.open("wb") as fp:
1✔
199
            cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
1✔
200

201
        # create a backup
202
        shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
1✔
203

204
    def save_metric_tracker_to_disk(self):
1✔
205
        """
206
        Save metric tracker of all pair metrics collected.
207
        """
208
        with self.save_lock:
1✔
209
            with self.metric_tracker_path.open('w') as fp:
1✔
210
                rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder,
1✔
211
                               number_mode=rapidjson.NM_NATIVE)
212

213
    def save_drawer_to_disk(self) -> None:
1✔
214
        """
215
        Save data drawer full of all pair model metadata in present model folder.
216
        """
217
        with self.save_lock:
1✔
218
            with self.pair_dictionary_path.open('w') as fp:
1✔
219
                rapidjson.dump(self.pair_dict, fp, default=self.np_encoder,
1✔
220
                               number_mode=rapidjson.NM_NATIVE)
221

222
    def save_global_metadata_to_disk(self, metadata: Dict[str, Any]):
1✔
223
        """
224
        Save global metadata json to disk
225
        """
226
        with self.save_lock:
1✔
227
            with self.global_metadata_path.open('w') as fp:
1✔
228
                rapidjson.dump(metadata, fp, default=self.np_encoder,
1✔
229
                               number_mode=rapidjson.NM_NATIVE)
230

231
    def np_encoder(self, object):
1✔
232
        if isinstance(object, np.generic):
1✔
233
            return object.item()
1✔
234

235
    def get_pair_dict_info(self, pair: str) -> Tuple[str, int]:
1✔
236
        """
237
        Locate and load existing model metadata from persistent storage. If not located,
238
        create a new one and append the current pair to it and prepare it for its first
239
        training
240
        :param pair: str: pair to lookup
241
        :return:
242
            model_filename: str = unique filename used for loading persistent objects from disk
243
            trained_timestamp: int = the last time the coin was trained
244
        """
245

246
        pair_dict = self.pair_dict.get(pair)
1✔
247

248
        if pair_dict:
1✔
249
            model_filename = pair_dict["model_filename"]
1✔
250
            trained_timestamp = pair_dict["trained_timestamp"]
1✔
251
        else:
252
            self.pair_dict[pair] = self.empty_pair_dict.copy()
1✔
253
            model_filename = ""
1✔
254
            trained_timestamp = 0
1✔
255

256
        return model_filename, trained_timestamp
1✔
257

258
    def set_pair_dict_info(self, metadata: dict) -> None:
1✔
259
        pair_in_dict = self.pair_dict.get(metadata["pair"])
×
260
        if pair_in_dict:
×
261
            return
×
262
        else:
263
            self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy()
×
264
            return
×
265

266
    def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None:
1✔
267
        """
268
        Set the initial return values to the historical predictions dataframe. This avoids needing
269
        to repredict on historical candles, and also stores historical predictions despite
270
        retrainings (so stored predictions are true predictions, not just inferencing on trained
271
        data)
272
        """
273

274
        hist_df = self.historic_predictions
×
275
        len_diff = len(hist_df[pair].index) - len(pred_df.index)
×
276
        if len_diff < 0:
×
277
            df_concat = pd.concat([pred_df.iloc[:abs(len_diff)], hist_df[pair]],
×
278
                                  ignore_index=True, keys=hist_df[pair].keys())
279
        else:
280
            df_concat = hist_df[pair].tail(len(pred_df.index)).reset_index(drop=True)
×
281
        df_concat = df_concat.fillna(0)
×
282
        self.model_return_values[pair] = df_concat
×
283

284
    def append_model_predictions(self, pair: str, predictions: DataFrame,
1✔
285
                                 do_preds: NDArray[np.int_],
286
                                 dk: FreqaiDataKitchen, strat_df: DataFrame) -> None:
287
        """
288
        Append model predictions to historic predictions dataframe, then set the
289
        strategy return dataframe to the tail of the historic predictions. The length of
290
        the tail is equivalent to the length of the dataframe that entered FreqAI from
291
        the strategy originally. Doing this allows FreqUI to always display the correct
292
        historic predictions.
293
        """
294

295
        len_df = len(strat_df)
×
296
        index = self.historic_predictions[pair].index[-1:]
×
297
        columns = self.historic_predictions[pair].columns
×
298

299
        nan_df = pd.DataFrame(np.nan, index=index, columns=columns)
×
300
        self.historic_predictions[pair] = pd.concat(
×
301
            [self.historic_predictions[pair], nan_df], ignore_index=True, axis=0)
302
        df = self.historic_predictions[pair]
×
303

304
        # model outputs and associated statistics
305
        for label in predictions.columns:
×
306
            df[label].iloc[-1] = predictions[label].iloc[-1]
×
307
            if df[label].dtype == object:
×
308
                continue
×
309
            df[f"{label}_mean"].iloc[-1] = dk.data["labels_mean"][label]
×
310
            df[f"{label}_std"].iloc[-1] = dk.data["labels_std"][label]
×
311

312
        # outlier indicators
313
        df["do_predict"].iloc[-1] = do_preds[-1]
×
314
        if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
×
315
            df["DI_values"].iloc[-1] = dk.DI_values[-1]
×
316

317
        # extra values the user added within custom prediction model
318
        if dk.data['extra_returns_per_train']:
×
319
            rets = dk.data['extra_returns_per_train']
×
320
            for return_str in rets:
×
321
                df[return_str].iloc[-1] = rets[return_str]
×
322

323
        # this logic carries users between version without needing to
324
        # change their identifier
325
        if 'close_price' not in df.columns:
×
326
            df['close_price'] = np.nan
×
327
            df['date_pred'] = np.nan
×
328

329
        df['close_price'].iloc[-1] = strat_df['close'].iloc[-1]
×
330
        df['date_pred'].iloc[-1] = strat_df['date'].iloc[-1]
×
331

332
        self.model_return_values[pair] = df.tail(len_df).reset_index(drop=True)
×
333

334
    def attach_return_values_to_return_dataframe(
1✔
335
            self, pair: str, dataframe: DataFrame) -> DataFrame:
336
        """
337
        Attach the return values to the strat dataframe
338
        :param dataframe: DataFrame = strategy dataframe
339
        :return: DataFrame = strat dataframe with return values attached
340
        """
341
        df = self.model_return_values[pair]
×
342
        to_keep = [col for col in dataframe.columns if not col.startswith("&")]
×
343
        dataframe = pd.concat([dataframe[to_keep], df], axis=1)
×
344
        return dataframe
×
345

346
    def return_null_values_to_strategy(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> None:
1✔
347
        """
348
        Build 0 filled dataframe to return to strategy
349
        """
350

351
        dk.find_features(dataframe)
×
352
        dk.find_labels(dataframe)
×
353

354
        full_labels = dk.label_list + dk.unique_class_list
×
355

356
        for label in full_labels:
×
357
            dataframe[label] = 0
×
358
            dataframe[f"{label}_mean"] = 0
×
359
            dataframe[f"{label}_std"] = 0
×
360

361
        dataframe["do_predict"] = 0
×
362

363
        if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
×
364
            dataframe["DI_values"] = 0
×
365

366
        if dk.data['extra_returns_per_train']:
×
367
            rets = dk.data['extra_returns_per_train']
×
368
            for return_str in rets:
×
369
                dataframe[return_str] = 0
×
370

371
        dk.return_dataframe = dataframe
×
372

373
    def purge_old_models(self) -> None:
1✔
374

375
        num_keep = self.freqai_info["purge_old_models"]
1✔
376
        if not num_keep:
1✔
377
            return
×
378
        elif isinstance(num_keep, bool):
1✔
379
            num_keep = 2
×
380

381
        model_folders = [x for x in self.full_path.iterdir() if x.is_dir()]
1✔
382

383
        pattern = re.compile(r"sub-train-(\w+)_(\d{10})")
1✔
384

385
        delete_dict: Dict[str, Any] = {}
1✔
386

387
        for dir in model_folders:
1✔
388
            result = pattern.match(str(dir.name))
1✔
389
            if result is None:
1✔
390
                continue
1✔
391
            coin = result.group(1)
1✔
392
            timestamp = result.group(2)
1✔
393

394
            if coin not in delete_dict:
1✔
395
                delete_dict[coin] = {}
1✔
396
                delete_dict[coin]["num_folders"] = 1
1✔
397
                delete_dict[coin]["timestamps"] = {int(timestamp): dir}
1✔
398
            else:
399
                delete_dict[coin]["num_folders"] += 1
×
400
                delete_dict[coin]["timestamps"][int(timestamp)] = dir
×
401

402
        for coin in delete_dict:
1✔
403
            if delete_dict[coin]["num_folders"] > num_keep:
1✔
404
                sorted_dict = collections.OrderedDict(
×
405
                    sorted(delete_dict[coin]["timestamps"].items())
406
                )
407
                num_delete = len(sorted_dict) - num_keep
×
408
                deleted = 0
×
409
                for k, v in sorted_dict.items():
×
410
                    if deleted >= num_delete:
×
411
                        break
×
412
                    logger.info(f"Freqai purging old model file {v}")
×
413
                    shutil.rmtree(v)
×
414
                    deleted += 1
×
415

416
    def save_metadata(self, dk: FreqaiDataKitchen) -> None:
1✔
417
        """
418
        Saves only metadata for backtesting studies if user prefers
419
        not to save model data. This saves tremendous amounts of space
420
        for users generating huge studies.
421
        This is only active when `save_backtest_models`: false (not default)
422
        """
423
        if not dk.data_path.is_dir():
×
424
            dk.data_path.mkdir(parents=True, exist_ok=True)
×
425

426
        save_path = Path(dk.data_path)
×
427

428
        dk.data["data_path"] = str(dk.data_path)
×
429
        dk.data["model_filename"] = str(dk.model_filename)
×
430
        dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
×
431
        dk.data["label_list"] = dk.label_list
×
432

433
        with (save_path / f"{dk.model_filename}_{METADATA}.json").open("w") as fp:
×
434
            rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
×
435

436
        return
×
437

438
    def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
1✔
439
        """
440
        Saves all data associated with a model for a single sub-train time range
441
        :param model: User trained model which can be reused for inferencing to generate
442
                      predictions
443
        """
444

445
        if not dk.data_path.is_dir():
1✔
446
            dk.data_path.mkdir(parents=True, exist_ok=True)
1✔
447

448
        save_path = Path(dk.data_path)
1✔
449

450
        # Save the trained model
451
        if self.model_type == 'joblib':
1✔
452
            dump(model, save_path / f"{dk.model_filename}_model.joblib")
1✔
453
        elif self.model_type == 'keras':
1✔
454
            model.save(save_path / f"{dk.model_filename}_model.h5")
×
455
        elif self.model_type in ["stable_baselines3", "sb3_contrib", "pytorch"]:
1✔
456
            model.save(save_path / f"{dk.model_filename}_model.zip")
1✔
457

458
        dk.data["data_path"] = str(dk.data_path)
1✔
459
        dk.data["model_filename"] = str(dk.model_filename)
1✔
460
        dk.data["training_features_list"] = dk.training_features_list
1✔
461
        dk.data["label_list"] = dk.label_list
1✔
462
        # store the metadata
463
        with (save_path / f"{dk.model_filename}_{METADATA}.json").open("w") as fp:
1✔
464
            rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
1✔
465

466
        # save the pipelines to pickle files
467
        with (save_path / f"{dk.model_filename}_{FEATURE_PIPELINE}.pkl").open("wb") as fp:
1✔
468
            cloudpickle.dump(dk.feature_pipeline, fp)
1✔
469

470
        with (save_path / f"{dk.model_filename}_{LABEL_PIPELINE}.pkl").open("wb") as fp:
1✔
471
            cloudpickle.dump(dk.label_pipeline, fp)
1✔
472

473
        # save the train data to file for post processing if desired
474
        dk.data_dictionary["train_features"].to_pickle(
1✔
475
            save_path / f"{dk.model_filename}_{TRAINDF}.pkl"
476
        )
477

478
        dk.data_dictionary["train_dates"].to_pickle(
1✔
479
            save_path / f"{dk.model_filename}_trained_dates_df.pkl"
480
        )
481

482
        self.model_dictionary[coin] = model
1✔
483
        self.pair_dict[coin]["model_filename"] = dk.model_filename
1✔
484
        self.pair_dict[coin]["data_path"] = str(dk.data_path)
1✔
485

486
        if coin not in self.meta_data_dictionary:
1✔
487
            self.meta_data_dictionary[coin] = {}
1✔
488
        self.meta_data_dictionary[coin][METADATA] = dk.data
1✔
489
        self.meta_data_dictionary[coin][FEATURE_PIPELINE] = dk.feature_pipeline
1✔
490
        self.meta_data_dictionary[coin][LABEL_PIPELINE] = dk.label_pipeline
1✔
491
        self.save_drawer_to_disk()
1✔
492

493
        return
1✔
494

495
    def load_metadata(self, dk: FreqaiDataKitchen) -> None:
1✔
496
        """
497
        Load only metadata into datakitchen to increase performance during
498
        presaved backtesting (prediction file loading).
499
        """
500
        with (dk.data_path / f"{dk.model_filename}_{METADATA}.json").open("r") as fp:
1✔
501
            dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
1✔
502
            dk.training_features_list = dk.data["training_features_list"]
1✔
503
            dk.label_list = dk.data["label_list"]
1✔
504

505
    def load_data(self, coin: str, dk: FreqaiDataKitchen) -> Any:  # noqa: C901
1✔
506
        """
507
        loads all data required to make a prediction on a sub-train time range
508
        :returns:
509
        :model: User trained model which can be inferenced for new predictions
510
        """
511

512
        if not self.pair_dict[coin]["model_filename"]:
1✔
513
            return None
×
514

515
        if dk.live:
1✔
516
            dk.model_filename = self.pair_dict[coin]["model_filename"]
1✔
517
            dk.data_path = Path(self.pair_dict[coin]["data_path"])
1✔
518

519
        if coin in self.meta_data_dictionary:
1✔
520
            dk.data = self.meta_data_dictionary[coin][METADATA]
1✔
521
            dk.feature_pipeline = self.meta_data_dictionary[coin][FEATURE_PIPELINE]
1✔
522
            dk.label_pipeline = self.meta_data_dictionary[coin][LABEL_PIPELINE]
1✔
523
        else:
524
            with (dk.data_path / f"{dk.model_filename}_{METADATA}.json").open("r") as fp:
×
525
                dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
×
526

527
            with (dk.data_path / f"{dk.model_filename}_{FEATURE_PIPELINE}.pkl").open("rb") as fp:
×
528
                dk.feature_pipeline = cloudpickle.load(fp)
×
529
            with (dk.data_path / f"{dk.model_filename}_{LABEL_PIPELINE}.pkl").open("rb") as fp:
×
530
                dk.label_pipeline = cloudpickle.load(fp)
×
531

532
        dk.training_features_list = dk.data["training_features_list"]
1✔
533
        dk.label_list = dk.data["label_list"]
1✔
534

535
        # try to access model in memory instead of loading object from disk to save time
536
        if dk.live and coin in self.model_dictionary:
1✔
537
            model = self.model_dictionary[coin]
1✔
538
        elif self.model_type == 'joblib':
×
539
            model = load(dk.data_path / f"{dk.model_filename}_model.joblib")
×
540
        elif 'stable_baselines' in self.model_type or 'sb3_contrib' == self.model_type:
×
541
            mod = importlib.import_module(
×
542
                self.model_type, self.freqai_info['rl_config']['model_type'])
543
            MODELCLASS = getattr(mod, self.freqai_info['rl_config']['model_type'])
×
544
            model = MODELCLASS.load(dk.data_path / f"{dk.model_filename}_model")
×
545
        elif self.model_type == 'pytorch':
×
546
            import torch
×
547
            zip = torch.load(dk.data_path / f"{dk.model_filename}_model.zip")
×
548
            model = zip["pytrainer"]
×
549
            model = model.load_from_checkpoint(zip)
×
550

551
        if not model:
1✔
552
            raise OperationalException(
×
553
                f"Unable to load model, ensure model exists at " f"{dk.data_path} "
554
            )
555

556
        # load it into ram if it was loaded from disk
557
        if coin not in self.model_dictionary:
1✔
558
            self.model_dictionary[coin] = model
×
559

560
        return model
1✔
561

562
    def update_historic_data(self, strategy: IStrategy, dk: FreqaiDataKitchen) -> None:
1✔
563
        """
564
        Append new candles to our stores historic data (in memory) so that
565
        we do not need to load candle history from disk and we dont need to
566
        pinging exchange multiple times for the same candle.
567
        :param dataframe: DataFrame = strategy provided dataframe
568
        """
569
        feat_params = self.freqai_info["feature_parameters"]
1✔
570
        with self.history_lock:
1✔
571
            history_data = self.historic_data
1✔
572

573
            for pair in dk.all_pairs:
1✔
574
                for tf in feat_params.get("include_timeframes"):
1✔
575
                    hist_df = history_data[pair][tf]
1✔
576
                    # check if newest candle is already appended
577
                    df_dp = strategy.dp.get_pair_dataframe(pair, tf)
1✔
578
                    if len(df_dp.index) == 0:
1✔
579
                        continue
×
580
                    if str(hist_df.iloc[-1]["date"]) == str(
1✔
581
                        df_dp.iloc[-1:]["date"].iloc[-1]
582
                    ):
583
                        continue
×
584

585
                    try:
1✔
586
                        index = (
1✔
587
                            df_dp.loc[
588
                                df_dp["date"] == hist_df.iloc[-1]["date"]
589
                            ].index[0]
590
                            + 1
591
                        )
592
                    except IndexError:
×
593
                        if hist_df.iloc[-1]['date'] < df_dp['date'].iloc[0]:
×
594
                            raise OperationalException("In memory historical data is older than "
×
595
                                                       f"oldest DataProvider candle for {pair} on "
596
                                                       f"timeframe {tf}")
597
                        else:
598
                            index = -1
×
599
                            logger.warning(
×
600
                                f"No common dates in historical data and dataprovider for {pair}. "
601
                                f"Appending latest dataprovider candle to historical data "
602
                                "but please be aware that there is likely a gap in the historical "
603
                                "data. \n"
604
                                f"Historical data ends at {hist_df.iloc[-1]['date']} "
605
                                f"while dataprovider starts at {df_dp['date'].iloc[0]} and"
606
                                f"ends at {df_dp['date'].iloc[0]}."
607
                            )
608

609
                    history_data[pair][tf] = pd.concat(
1✔
610
                        [
611
                            hist_df,
612
                            df_dp.iloc[index:],
613
                        ],
614
                        ignore_index=True,
615
                        axis=0,
616
                    )
617

618
            self.current_candle = history_data[dk.pair][self.config['timeframe']].iloc[-1]['date']
1✔
619

620
    def load_all_pair_histories(self, timerange: TimeRange, dk: FreqaiDataKitchen) -> None:
1✔
621
        """
622
        Load pair histories for all whitelist and corr_pairlist pairs.
623
        Only called once upon startup of bot.
624
        :param timerange: TimeRange = full timerange required to populate all indicators
625
                          for training according to user defined train_period_days
626
        """
627
        history_data = self.historic_data
1✔
628

629
        for pair in dk.all_pairs:
1✔
630
            if pair not in history_data:
1✔
631
                history_data[pair] = {}
1✔
632
            for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
1✔
633
                history_data[pair][tf] = load_pair_history(
1✔
634
                    datadir=self.config["datadir"],
635
                    timeframe=tf,
636
                    pair=pair,
637
                    timerange=timerange,
638
                    data_format=self.config.get("dataformat_ohlcv", "feather"),
639
                    candle_type=self.config.get("candle_type_def", CandleType.SPOT),
640
                )
641

642
    def get_base_and_corr_dataframes(
1✔
643
        self, timerange: TimeRange, pair: str, dk: FreqaiDataKitchen
644
    ) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
645
        """
646
        Searches through our historic_data in memory and returns the dataframes relevant
647
        to the present pair.
648
        :param timerange: TimeRange = full timerange required to populate all indicators
649
                          for training according to user defined train_period_days
650
        :param metadata: dict = strategy furnished pair metadata
651
        """
652
        with self.history_lock:
1✔
653
            corr_dataframes: Dict[Any, Any] = {}
1✔
654
            base_dataframes: Dict[Any, Any] = {}
1✔
655
            historic_data = self.historic_data
1✔
656
            pairs = self.freqai_info["feature_parameters"].get(
1✔
657
                "include_corr_pairlist", []
658
            )
659

660
            for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
1✔
661
                base_dataframes[tf] = dk.slice_dataframe(
1✔
662
                    timerange, historic_data[pair][tf]).reset_index(drop=True)
663
                if pairs:
1✔
664
                    for p in pairs:
1✔
665
                        if pair in p:
1✔
666
                            continue  # dont repeat anything from whitelist
1✔
667
                        if p not in corr_dataframes:
1✔
668
                            corr_dataframes[p] = {}
1✔
669
                        corr_dataframes[p][tf] = dk.slice_dataframe(
1✔
670
                            timerange, historic_data[p][tf]
671
                        ).reset_index(drop=True)
672

673
        return corr_dataframes, base_dataframes
1✔
674

675
    def get_timerange_from_live_historic_predictions(self) -> TimeRange:
1✔
676
        """
677
        Returns timerange information based on historic predictions file
678
        :return: timerange calculated from saved live data
679
        """
680
        if not self.historic_predictions_path.is_file():
1✔
681
            raise OperationalException(
1✔
682
                'Historic predictions not found. Historic predictions data is required '
683
                'to run backtest with the freqai-backtest-live-models option '
684
            )
685

686
        self.load_historic_predictions_from_disk()
1✔
687

688
        all_pairs_end_dates = []
1✔
689
        for pair in self.historic_predictions:
1✔
690
            pair_historic_data = self.historic_predictions[pair]
1✔
691
            all_pairs_end_dates.append(pair_historic_data.date_pred.max())
1✔
692

693
        global_metadata = self.load_global_metadata_from_disk()
1✔
694
        start_date = datetime.fromtimestamp(int(global_metadata["start_dry_live_date"]))
1✔
695
        end_date = max(all_pairs_end_dates)
1✔
696
        # add 1 day to string timerange to ensure BT module will load all dataframe data
697
        end_date = end_date + timedelta(days=1)
1✔
698
        backtesting_timerange = TimeRange(
1✔
699
            'date', 'date', int(start_date.timestamp()), int(end_date.timestamp())
700
        )
701
        return backtesting_timerange
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc