• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.35
/freqtrade/data/history/datahandlers/idatahandler.py
1
"""
2
Abstract datahandler interface.
3
It's subclasses handle and storing data from disk.
4

5
"""
6
import logging
1✔
7
import re
1✔
8
from abc import ABC, abstractmethod
1✔
9
from copy import deepcopy
1✔
10
from datetime import datetime, timezone
1✔
11
from pathlib import Path
1✔
12
from typing import List, Optional, Tuple, Type
1✔
13

14
from pandas import DataFrame
1✔
15

16
from freqtrade import misc
1✔
17
from freqtrade.configuration import TimeRange
1✔
18
from freqtrade.constants import DEFAULT_TRADES_COLUMNS, ListPairsWithTimeframes
1✔
19
from freqtrade.data.converter import (clean_ohlcv_dataframe, trades_convert_types,
1✔
20
                                      trades_df_remove_duplicates, trim_dataframe)
21
from freqtrade.enums import CandleType, TradingMode
1✔
22
from freqtrade.exchange import timeframe_to_seconds
1✔
23

24

25
logger = logging.getLogger(__name__)
1✔
26

27

28
class IDataHandler(ABC):
1✔
29

30
    _OHLCV_REGEX = r'^([a-zA-Z_\d-]+)\-(\d+[a-zA-Z]{1,2})\-?([a-zA-Z_]*)?(?=\.)'
1✔
31

32
    def __init__(self, datadir: Path) -> None:
1✔
33
        self._datadir = datadir
1✔
34

35
    @classmethod
1✔
36
    def _get_file_extension(cls) -> str:
1✔
37
        """
38
        Get file extension for this particular datahandler
39
        """
40
        raise NotImplementedError()
×
41

42
    @classmethod
1✔
43
    def ohlcv_get_available_data(
1✔
44
            cls, datadir: Path, trading_mode: TradingMode) -> ListPairsWithTimeframes:
45
        """
46
        Returns a list of all pairs with ohlcv data available in this datadir
47
        :param datadir: Directory to search for ohlcv files
48
        :param trading_mode: trading-mode to be used
49
        :return: List of Tuples of (pair, timeframe, CandleType)
50
        """
51
        if trading_mode == TradingMode.FUTURES:
1✔
52
            datadir = datadir.joinpath('futures')
1✔
53
        _tmp = [
1✔
54
            re.search(
55
                cls._OHLCV_REGEX, p.name
56
            ) for p in datadir.glob(f"*.{cls._get_file_extension()}")]
57
        return [
1✔
58
            (
59
                cls.rebuild_pair_from_filename(match[1]),
60
                cls.rebuild_timeframe_from_filename(match[2]),
61
                CandleType.from_string(match[3])
62
            ) for match in _tmp if match and len(match.groups()) > 1]
63

64
    @classmethod
1✔
65
    def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> List[str]:
1✔
66
        """
67
        Returns a list of all pairs with ohlcv data available in this datadir
68
        for the specified timeframe
69
        :param datadir: Directory to search for ohlcv files
70
        :param timeframe: Timeframe to search pairs for
71
        :param candle_type: Any of the enum CandleType (must match trading mode!)
72
        :return: List of Pairs
73
        """
74
        candle = ""
1✔
75
        if candle_type != CandleType.SPOT:
1✔
76
            datadir = datadir.joinpath('futures')
1✔
77
            candle = f"-{candle_type}"
1✔
78
        ext = cls._get_file_extension()
1✔
79
        _tmp = [re.search(r'^(\S+)(?=\-' + timeframe + candle + f'.{ext})', p.name)
1✔
80
                for p in datadir.glob(f"*{timeframe}{candle}.{ext}")]
81
        # Check if regex found something and only return these results
82
        return [cls.rebuild_pair_from_filename(match[0]) for match in _tmp if match]
1✔
83

84
    @abstractmethod
1✔
85
    def ohlcv_store(
1✔
86
            self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None:
87
        """
88
        Store ohlcv data.
89
        :param pair: Pair - used to generate filename
90
        :param timeframe: Timeframe - used to generate filename
91
        :param data: Dataframe containing OHLCV data
92
        :param candle_type: Any of the enum CandleType (must match trading mode!)
93
        :return: None
94
        """
95

96
    def ohlcv_data_min_max(self, pair: str, timeframe: str,
1✔
97
                           candle_type: CandleType) -> Tuple[datetime, datetime, int]:
98
        """
99
        Returns the min and max timestamp for the given pair and timeframe.
100
        :param pair: Pair to get min/max for
101
        :param timeframe: Timeframe to get min/max for
102
        :param candle_type: Any of the enum CandleType (must match trading mode!)
103
        :return: (min, max, len)
104
        """
105
        df = self._ohlcv_load(pair, timeframe, None, candle_type)
1✔
106
        if df.empty:
1✔
107
            return (
1✔
108
                datetime.fromtimestamp(0, tz=timezone.utc),
109
                datetime.fromtimestamp(0, tz=timezone.utc),
110
                0,
111
            )
112
        return df.iloc[0]['date'].to_pydatetime(), df.iloc[-1]['date'].to_pydatetime(), len(df)
1✔
113

114
    @abstractmethod
1✔
115
    def _ohlcv_load(self, pair: str, timeframe: str, timerange: Optional[TimeRange],
1✔
116
                    candle_type: CandleType
117
                    ) -> DataFrame:
118
        """
119
        Internal method used to load data for one pair from disk.
120
        Implements the loading and conversion to a Pandas dataframe.
121
        Timerange trimming and dataframe validation happens outside of this method.
122
        :param pair: Pair to load data
123
        :param timeframe: Timeframe (e.g. "5m")
124
        :param timerange: Limit data to be loaded to this timerange.
125
                        Optionally implemented by subclasses to avoid loading
126
                        all data where possible.
127
        :param candle_type: Any of the enum CandleType (must match trading mode!)
128
        :return: DataFrame with ohlcv data, or empty DataFrame
129
        """
130

131
    def ohlcv_purge(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
1✔
132
        """
133
        Remove data for this pair
134
        :param pair: Delete data for this pair.
135
        :param timeframe: Timeframe (e.g. "5m")
136
        :param candle_type: Any of the enum CandleType (must match trading mode!)
137
        :return: True when deleted, false if file did not exist.
138
        """
139
        filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
1✔
140
        if filename.exists():
1✔
141
            filename.unlink()
1✔
142
            return True
1✔
143
        return False
1✔
144

145
    @abstractmethod
1✔
146
    def ohlcv_append(
1✔
147
        self,
148
        pair: str,
149
        timeframe: str,
150
        data: DataFrame,
151
        candle_type: CandleType
152
    ) -> None:
153
        """
154
        Append data to existing data structures
155
        :param pair: Pair
156
        :param timeframe: Timeframe this ohlcv data is for
157
        :param data: Data to append.
158
        :param candle_type: Any of the enum CandleType (must match trading mode!)
159
        """
160

161
    @classmethod
1✔
162
    def trades_get_pairs(cls, datadir: Path) -> List[str]:
1✔
163
        """
164
        Returns a list of all pairs for which trade data is available in this
165
        :param datadir: Directory to search for ohlcv files
166
        :return: List of Pairs
167
        """
168
        _ext = cls._get_file_extension()
1✔
169
        _tmp = [re.search(r'^(\S+)(?=\-trades.' + _ext + ')', p.name)
1✔
170
                for p in datadir.glob(f"*trades.{_ext}")]
171
        # Check if regex found something and only return these results to avoid exceptions.
172
        return [cls.rebuild_pair_from_filename(match[0]) for match in _tmp if match]
1✔
173

174
    @abstractmethod
1✔
175
    def _trades_store(self, pair: str, data: DataFrame, trading_mode: TradingMode) -> None:
1✔
176
        """
177
        Store trades data (list of Dicts) to file
178
        :param pair: Pair - used for filename
179
        :param data: Dataframe containing trades
180
                     column sequence as in DEFAULT_TRADES_COLUMNS
181
        :param trading_mode: Trading mode to use (used to determine the filename)
182
        """
183

184
    @abstractmethod
1✔
185
    def trades_append(self, pair: str, data: DataFrame):
1✔
186
        """
187
        Append data to existing files
188
        :param pair: Pair - used for filename
189
        :param data: Dataframe containing trades
190
                     column sequence as in DEFAULT_TRADES_COLUMNS
191
        """
192

193
    @abstractmethod
1✔
194
    def _trades_load(
1✔
195
        self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None
196
    ) -> DataFrame:
197
        """
198
        Load a pair from file, either .json.gz or .json
199
        :param pair: Load trades for this pair
200
        :param trading_mode: Trading mode to use (used to determine the filename)
201
        :param timerange: Timerange to load trades for - currently not implemented
202
        :return: Dataframe containing trades
203
        """
204

205
    def trades_store(self, pair: str, data: DataFrame, trading_mode: TradingMode) -> None:
1✔
206
        """
207
        Store trades data (list of Dicts) to file
208
        :param pair: Pair - used for filename
209
        :param data: Dataframe containing trades
210
                     column sequence as in DEFAULT_TRADES_COLUMNS
211
        :param trading_mode: Trading mode to use (used to determine the filename)
212
        """
213
        # Filter on expected columns (will remove the actual date column).
214
        self._trades_store(pair, data[DEFAULT_TRADES_COLUMNS], trading_mode)
1✔
215

216
    def trades_purge(self, pair: str, trading_mode: TradingMode) -> bool:
1✔
217
        """
218
        Remove data for this pair
219
        :param pair: Delete data for this pair.
220
        :param trading_mode: Trading mode to use (used to determine the filename)
221
        :return: True when deleted, false if file did not exist.
222
        """
223
        filename = self._pair_trades_filename(self._datadir, pair, trading_mode)
1✔
224
        if filename.exists():
1✔
225
            filename.unlink()
1✔
226
            return True
1✔
227
        return False
1✔
228

229
    def trades_load(
1✔
230
            self, pair: str, trading_mode: TradingMode, timerange: Optional[TimeRange] = None
231
    ) -> DataFrame:
232
        """
233
        Load a pair from file, either .json.gz or .json
234
        Removes duplicates in the process.
235
        :param pair: Load trades for this pair
236
        :param trading_mode: Trading mode to use (used to determine the filename)
237
        :param timerange: Timerange to load trades for - currently not implemented
238
        :return: List of trades
239
        """
240
        trades = trades_df_remove_duplicates(
1✔
241
            self._trades_load(pair, trading_mode, timerange=timerange)
242
        )
243

244
        trades = trades_convert_types(trades)
1✔
245
        return trades
1✔
246

247
    @classmethod
1✔
248
    def create_dir_if_needed(cls, datadir: Path):
1✔
249
        """
250
        Creates datadir if necessary
251
        should only create directories for "futures" mode at the moment.
252
        """
253
        if not datadir.parent.is_dir():
1✔
254
            datadir.parent.mkdir()
1✔
255

256
    @classmethod
1✔
257
    def _pair_data_filename(
1✔
258
        cls,
259
        datadir: Path,
260
        pair: str,
261
        timeframe: str,
262
        candle_type: CandleType,
263
        no_timeframe_modify: bool = False
264
    ) -> Path:
265
        pair_s = misc.pair_to_filename(pair)
1✔
266
        candle = ""
1✔
267
        if not no_timeframe_modify:
1✔
268
            timeframe = cls.timeframe_to_file(timeframe)
1✔
269

270
        if candle_type != CandleType.SPOT:
1✔
271
            datadir = datadir.joinpath('futures')
1✔
272
            candle = f"-{candle_type}"
1✔
273
        filename = datadir.joinpath(
1✔
274
            f'{pair_s}-{timeframe}{candle}.{cls._get_file_extension()}')
275
        return filename
1✔
276

277
    @classmethod
1✔
278
    def _pair_trades_filename(cls, datadir: Path, pair: str, trading_mode: TradingMode) -> Path:
1✔
279
        pair_s = misc.pair_to_filename(pair)
1✔
280
        if trading_mode == TradingMode.FUTURES:
1✔
281
            # Futures pair ...
282
            datadir = datadir.joinpath('futures')
1✔
283

284
        filename = datadir.joinpath(f'{pair_s}-trades.{cls._get_file_extension()}')
1✔
285
        return filename
1✔
286

287
    @staticmethod
1✔
288
    def timeframe_to_file(timeframe: str):
1✔
289
        return timeframe.replace('M', 'Mo')
1✔
290

291
    @staticmethod
1✔
292
    def rebuild_timeframe_from_filename(timeframe: str) -> str:
1✔
293
        """
294
        converts timeframe from disk to file
295
        Replaces mo with M (to avoid problems on case-insensitive filesystems)
296
        """
297
        return re.sub('1mo', '1M', timeframe, flags=re.IGNORECASE)
1✔
298

299
    @staticmethod
1✔
300
    def rebuild_pair_from_filename(pair: str) -> str:
1✔
301
        """
302
        Rebuild pair name from filename
303
        Assumes a asset name of max. 7 length to also support BTC-PERP and BTC-PERP:USD names.
304
        """
305
        res = re.sub(r'^(([A-Za-z\d]{1,10})|^([A-Za-z\-]{1,6}))(_)', r'\g<1>/', pair, count=1)
1✔
306
        res = re.sub('_', ':', res, count=1)
1✔
307
        return res
1✔
308

309
    def ohlcv_load(self, pair, timeframe: str,
1✔
310
                   candle_type: CandleType, *,
311
                   timerange: Optional[TimeRange] = None,
312
                   fill_missing: bool = True,
313
                   drop_incomplete: bool = False,
314
                   startup_candles: int = 0,
315
                   warn_no_data: bool = True,
316
                   ) -> DataFrame:
317
        """
318
        Load cached candle (OHLCV) data for the given pair.
319

320
        :param pair: Pair to load data for
321
        :param timeframe: Timeframe (e.g. "5m")
322
        :param timerange: Limit data to be loaded to this timerange
323
        :param fill_missing: Fill missing values with "No action"-candles
324
        :param drop_incomplete: Drop last candle assuming it may be incomplete.
325
        :param startup_candles: Additional candles to load at the start of the period
326
        :param warn_no_data: Log a warning message when no data is found
327
        :param candle_type: Any of the enum CandleType (must match trading mode!)
328
        :return: DataFrame with ohlcv data, or empty DataFrame
329
        """
330
        # Fix startup period
331
        timerange_startup = deepcopy(timerange)
1✔
332
        if startup_candles > 0 and timerange_startup:
1✔
333
            timerange_startup.subtract_start(timeframe_to_seconds(timeframe) * startup_candles)
1✔
334

335
        pairdf = self._ohlcv_load(
1✔
336
            pair,
337
            timeframe,
338
            timerange=timerange_startup,
339
            candle_type=candle_type
340
        )
341
        if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data):
1✔
342
            return pairdf
1✔
343
        else:
344
            enddate = pairdf.iloc[-1]['date']
1✔
345

346
            if timerange_startup:
1✔
347
                self._validate_pairdata(pair, pairdf, timeframe, candle_type, timerange_startup)
1✔
348
                pairdf = trim_dataframe(pairdf, timerange_startup)
1✔
349
                if self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data, True):
1✔
350
                    return pairdf
1✔
351

352
            # incomplete candles should only be dropped if we didn't trim the end beforehand.
353
            pairdf = clean_ohlcv_dataframe(pairdf, timeframe,
1✔
354
                                           pair=pair,
355
                                           fill_missing=fill_missing,
356
                                           drop_incomplete=(drop_incomplete and
357
                                                            enddate == pairdf.iloc[-1]['date']))
358
            self._check_empty_df(pairdf, pair, timeframe, candle_type, warn_no_data)
1✔
359
            return pairdf
1✔
360

361
    def _check_empty_df(
1✔
362
            self, pairdf: DataFrame, pair: str, timeframe: str, candle_type: CandleType,
363
            warn_no_data: bool, warn_price: bool = False) -> bool:
364
        """
365
        Warn on empty dataframe
366
        """
367
        if pairdf.empty:
1✔
368
            if warn_no_data:
1✔
369
                logger.warning(
1✔
370
                    f"No history for {pair}, {candle_type}, {timeframe} found. "
371
                    "Use `freqtrade download-data` to download the data"
372
                )
373
            return True
1✔
374
        elif warn_price:
1✔
375
            candle_price_gap = 0
1✔
376
            if (candle_type in (CandleType.SPOT, CandleType.FUTURES) and
1✔
377
                    not pairdf.empty
378
                    and 'close' in pairdf.columns and 'open' in pairdf.columns):
379
                # Detect gaps between prior close and open
380
                gaps = ((pairdf['open'] - pairdf['close'].shift(1)) / pairdf['close'].shift(1))
1✔
381
                gaps = gaps.dropna()
1✔
382
                if len(gaps):
1✔
383
                    candle_price_gap = max(abs(gaps))
1✔
384
            if candle_price_gap > 0.1:
1✔
385
                logger.info(f"Price jump in {pair}, {timeframe}, {candle_type} between two candles "
1✔
386
                            f"of {candle_price_gap:.2%} detected.")
387

388
        return False
1✔
389

390
    def _validate_pairdata(self, pair, pairdata: DataFrame, timeframe: str,
1✔
391
                           candle_type: CandleType, timerange: TimeRange):
392
        """
393
        Validates pairdata for missing data at start end end and logs warnings.
394
        :param pairdata: Dataframe to validate
395
        :param timerange: Timerange specified for start and end dates
396
        """
397

398
        if timerange.starttype == 'date':
1✔
399
            if pairdata.iloc[0]['date'] > timerange.startdt:
1✔
400
                logger.warning(f"{pair}, {candle_type}, {timeframe}, "
1✔
401
                               f"data starts at {pairdata.iloc[0]['date']:%Y-%m-%d %H:%M:%S}")
402
        if timerange.stoptype == 'date':
1✔
403
            if pairdata.iloc[-1]['date'] < timerange.stopdt:
1✔
404
                logger.warning(f"{pair}, {candle_type}, {timeframe}, "
1✔
405
                               f"data ends at {pairdata.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}")
406

407
    def rename_futures_data(
1✔
408
            self, pair: str, new_pair: str, timeframe: str, candle_type: CandleType):
409
        """
410
        Temporary method to migrate data from old naming to new naming (BTC/USDT -> BTC/USDT:USDT)
411
        Only used for binance to support the binance futures naming unification.
412
        """
413

414
        file_old = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
1✔
415
        file_new = self._pair_data_filename(self._datadir, new_pair, timeframe, candle_type)
1✔
416
        # print(file_old, file_new)
417
        if file_new.exists():
1✔
418
            logger.warning(f"{file_new} exists already, can't migrate {pair}.")
×
419
            return
×
420
        file_old.rename(file_new)
1✔
421

422
    def fix_funding_fee_timeframe(self, ff_timeframe: str):
1✔
423
        """
424
        Temporary method to migrate data from old funding fee timeframe to the correct timeframe
425
        Applies to bybit and okx, where funding-fee and mark candles have different timeframes.
426
        """
427
        paircombs = self.ohlcv_get_available_data(self._datadir, TradingMode.FUTURES)
1✔
428
        funding_rate_combs = [
1✔
429
            f for f in paircombs if f[2] == CandleType.FUNDING_RATE and f[1] != ff_timeframe
430
        ]
431

432
        if funding_rate_combs:
1✔
433
            logger.warning(
1✔
434
                f'Migrating {len(funding_rate_combs)} funding fees to correct timeframe.')
435

436
        for pair, timeframe, candletype in funding_rate_combs:
1✔
437
            old_name = self._pair_data_filename(self._datadir, pair, timeframe, candletype)
1✔
438
            new_name = self._pair_data_filename(self._datadir, pair, ff_timeframe, candletype)
1✔
439

440
            if not Path(old_name).exists():
1✔
441
                logger.warning(f'{old_name} does not exist, skipping.')
×
442
                continue
×
443

444
            if Path(new_name).exists():
1✔
445
                logger.warning(f'{new_name} already exists, Removing.')
×
446
                Path(new_name).unlink()
×
447

448
            Path(old_name).rename(new_name)
1✔
449

450

451
def get_datahandlerclass(datatype: str) -> Type[IDataHandler]:
1✔
452
    """
453
    Get datahandler class.
454
    Could be done using Resolvers, but since this may be called often and resolvers
455
    are rather expensive, doing this directly should improve performance.
456
    :param datatype: datatype to use.
457
    :return: Datahandler class
458
    """
459

460
    if datatype == 'json':
1✔
461
        from .jsondatahandler import JsonDataHandler
1✔
462
        return JsonDataHandler
1✔
463
    elif datatype == 'jsongz':
1✔
464
        from .jsondatahandler import JsonGzDataHandler
1✔
465
        return JsonGzDataHandler
1✔
466
    elif datatype == 'hdf5':
1✔
467
        from .hdf5datahandler import HDF5DataHandler
1✔
468
        return HDF5DataHandler
1✔
469
    elif datatype == 'feather':
1✔
470
        from .featherdatahandler import FeatherDataHandler
1✔
471
        return FeatherDataHandler
1✔
472
    elif datatype == 'parquet':
1✔
473
        from .parquetdatahandler import ParquetDataHandler
1✔
474
        return ParquetDataHandler
1✔
475
    else:
476
        raise ValueError(f"No datahandler for datatype {datatype} available.")
1✔
477

478

479
def get_datahandler(datadir: Path, data_format: Optional[str] = None,
1✔
480
                    data_handler: Optional[IDataHandler] = None) -> IDataHandler:
481
    """
482
    :param datadir: Folder to save data
483
    :param data_format: dataformat to use
484
    :param data_handler: returns this datahandler if it exists or initializes a new one
485
    """
486

487
    if not data_handler:
1✔
488
        HandlerClass = get_datahandlerclass(data_format or 'feather')
1✔
489
        data_handler = HandlerClass(datadir)
1✔
490
    return data_handler
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc