• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ricequant / rqalpha / 17906102104

22 Sep 2025 06:02AM UTC coverage: 65.213% (+0.03%) from 65.183%
17906102104

push

github

web-flow
Rqsdk 815 (#938)

* Remove debug print statement from the plot function in the system analyser module.

* Add debug logging for Matplotlib backend in plot function of system analyser module

* add FutureArbitrage

* update version

* update

* update

---------

Co-authored-by: 崔子琦 <oscarcui@ricequant.com>
Co-authored-by: Cuizi7 <Cuizi7@users.noreply.github.com>
Co-authored-by: pitaya <wangjiangfeng77qq@163.com>
Co-authored-by: Don <lin.dongzhao@ricequant.com>

8 of 13 new or added lines in 5 files covered. (61.54%)

128 existing lines in 5 files now uncovered.

6790 of 10412 relevant lines covered (65.21%)

4.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.22
/rqalpha/data/base_data_source/data_source.py
1
# -*- coding: utf-8 -*-
2
# 版权所有 2020 深圳米筐科技有限公司(下称“米筐科技”)
3
#
4
# 除非遵守当前许可,否则不得使用本软件。
5
#
6
#     * 非商业用途(非商业用途指个人出于非商业目的使用本软件,或者高校、研究所等非营利机构出于教育、科研等目的使用本软件):
7
#         遵守 Apache License 2.0(下称“Apache 2.0 许可”),
8
#         您可以在以下位置获得 Apache 2.0 许可的副本:http://www.apache.org/licenses/LICENSE-2.0。
9
#         除非法律有要求或以书面形式达成协议,否则本软件分发时需保持当前许可“原样”不变,且不得附加任何条件。
10
#
11
#     * 商业用途(商业用途指个人出于任何商业目的使用本软件,或者法人或其他组织出于任何目的使用本软件):
12
#         未经米筐科技授权,任何个人不得出于任何商业目的使用本软件(包括但不限于向第三方提供、销售、出租、出借、转让本软件、
13
#         本软件的衍生产品、引用或借鉴了本软件功能或源代码的产品或服务),任何法人或其他组织不得出于任何目的使用本软件,
14
#         否则米筐科技有权追究相应的知识产权侵权责任。
15
#         在此前提下,对本软件的使用同样需要遵守 Apache 2.0 许可,Apache 2.0 许可与本许可冲突之处,以本许可为准。
16
#         详细的授权流程,请联系 public@ricequant.com 获取。
17
import os
7✔
18
import pickle
7✔
19
from datetime import date, datetime, timedelta
7✔
20
from itertools import groupby
7✔
21
from typing import Dict, Iterable, List, Optional, Sequence, Union
7✔
22

23
import numpy as np
7✔
24
import pandas as pd
7✔
25
import six
7✔
26
from rqalpha.utils.i18n import gettext as _
7✔
27
from rqalpha.const import INSTRUMENT_TYPE, TRADING_CALENDAR_TYPE
7✔
28
from rqalpha.interface import AbstractDataSource
7✔
29
from rqalpha.model.instrument import Instrument
7✔
30
from rqalpha.utils.datetime_func import (convert_date_to_int, convert_int_to_date, convert_int_to_datetime)
7✔
31
from rqalpha.utils.exception import RQInvalidArgument
7✔
32
from rqalpha.utils.functools import lru_cache
7✔
33
from rqalpha.utils.typing import DateLike
7✔
34
from rqalpha.utils.logger import system_log
7✔
35
from rqalpha.environment import Environment
7✔
36
from rqalpha.data.base_data_source.adjust import FIELDS_REQUIRE_ADJUSTMENT, adjust_bars
7✔
37
from rqalpha.data.base_data_source.storage_interface import (AbstractCalendarStore, AbstractDateSet,
7✔
38
                                AbstractDayBarStore, AbstractDividendStore,
39
                                AbstractInstrumentStore)
40
from rqalpha.data.base_data_source.storages import (DateSet, DayBarStore, DividendStore,
7✔
41
                       ExchangeTradingCalendarStore, FutureDayBarStore,
42
                       FutureInfoStore,InstrumentStore,
43
                       ShareTransformationStore, SimpleFactorStore,
44
                       YieldCurveStore, FuturesTradingParameters)
45

46

47
BAR_RESAMPLE_FIELD_METHODS = {
7✔
48
    "open": "first",
49
    "close": "last",
50
    "iopv": "last",
51
    "high": "max",
52
    "low": "min",
53
    "total_turnover": "sum",
54
    "volume": "sum",
55
    "num_trades": "sum",
56
    "acc_net_value": "last",
57
    "unit_net_value": "last",
58
    "discount_rate": "last",
59
    "settlement": "last",
60
    "prev_settlement": "last",
61
    "open_interest": "last",
62
    "basis_spread": "last",
63
    "contract_multiplier": "last",
64
    "strike_price": "last",
65
}
66

67

68
class BaseDataSource(AbstractDataSource):
7✔
69
    DEFAULT_INS_TYPES = (
7✔
70
        INSTRUMENT_TYPE.CS, INSTRUMENT_TYPE.FUTURE, INSTRUMENT_TYPE.ETF, INSTRUMENT_TYPE.LOF, INSTRUMENT_TYPE.INDX,
71
        INSTRUMENT_TYPE.PUBLIC_FUND, INSTRUMENT_TYPE.REITs
72
    )
73

74
    def __init__(self, path: str, custom_future_info: dict, *args, **kwargs) -> None:
7✔
75
        if not os.path.exists(path):
7✔
76
            raise RuntimeError('bundle path {} not exist'.format(os.path.abspath(path)))
×
77

78
        def _p(name):
7✔
79
            return os.path.join(path, name)
7✔
80

81
        funds_day_bar_store = DayBarStore(_p('funds.h5'))
7✔
82
        self._day_bars = {
7✔
83
            INSTRUMENT_TYPE.CS: DayBarStore(_p('stocks.h5')),
84
            INSTRUMENT_TYPE.INDX: DayBarStore(_p('indexes.h5')),
85
            INSTRUMENT_TYPE.FUTURE: FutureDayBarStore(_p('futures.h5')),
86
            INSTRUMENT_TYPE.ETF: funds_day_bar_store,
87
            INSTRUMENT_TYPE.LOF: funds_day_bar_store,
88
            INSTRUMENT_TYPE.REITs: funds_day_bar_store
89
        }  # type: Dict[INSTRUMENT_TYPE, AbstractDayBarStore]
90
        
91
        self._future_info_store = FutureInfoStore(_p("future_info.json"), custom_future_info)
7✔
92
        
93
        self._instruments_stores = {}  # type: Dict[INSTRUMENT_TYPE, AbstractInstrumentStore]
7✔
94
        self._ins_id_or_sym_type_map = {}  # type: Dict[str, INSTRUMENT_TYPE]
7✔
95
        instruments = []
7✔
96
        
97
        env = Environment.get_instance()
7✔
98
        unsupported_types = []
7✔
99
        with open(_p('instruments.pk'), 'rb') as f:
7✔
100
            for i in pickle.load(f):
7✔
101
                if i["type"] == "Future" and Instrument.is_future_continuous_contract(i["order_book_id"]):
7✔
102
                    i["listed_date"] = datetime(1990, 1, 1)
7✔
103
                if i["type"] not in INSTRUMENT_TYPE:
7✔
NEW
104
                    if i["type"] not in unsupported_types:
×
NEW
105
                        unsupported_types.append(i["type"])
×
NEW
106
                        system_log.warning(f"Unsupported type: {i['type']}")
×
NEW
107
                    continue
×
108
                instruments.append(Instrument(
7✔
109
                    i, 
110
                    lambda i: self._future_info_store.get_tick_size(i),
111
                    ))
112
        for ins_type in self.DEFAULT_INS_TYPES:
7✔
113
            self.register_instruments_store(InstrumentStore(instruments, ins_type))
7✔
114
        dividend_store = DividendStore(_p('dividends.h5'))
7✔
115
        self._dividends = {
7✔
116
            INSTRUMENT_TYPE.CS: dividend_store,
117
            INSTRUMENT_TYPE.ETF: dividend_store,
118
            INSTRUMENT_TYPE.LOF: dividend_store,
119
        }
120

121
        self._calendar_providers = {
7✔
122
            TRADING_CALENDAR_TYPE.EXCHANGE: ExchangeTradingCalendarStore(_p("trading_dates.npy"))
123
        }
124

125
        self._yield_curve = YieldCurveStore(_p('yield_curve.h5'))
7✔
126

127
        split_store = SimpleFactorStore(_p('split_factor.h5'))
7✔
128
        self._split_factors = {
7✔
129
            INSTRUMENT_TYPE.CS: split_store,
130
            INSTRUMENT_TYPE.ETF: split_store,
131
            INSTRUMENT_TYPE.LOF: split_store,
132
        }
133
        self._ex_cum_factor = SimpleFactorStore(_p('ex_cum_factor.h5'))
7✔
134
        self._share_transformation = ShareTransformationStore(_p('share_transformation.json'))
7✔
135

136
        self._suspend_days = [DateSet(_p('suspended_days.h5'))]  # type: List[AbstractDateSet]
7✔
137
        self._st_stock_days = DateSet(_p('st_stock_days.h5'))
7✔
138

139
    def register_day_bar_store(self, instrument_type, store):
7✔
140
        #  type: (INSTRUMENT_TYPE, AbstractDayBarStore) -> None
141
        self._day_bars[instrument_type] = store
×
142

143
    def register_instruments_store(self, instruments_store):
7✔
144
        # type: (AbstractInstrumentStore) -> None
145
        instrument_type = instruments_store.instrument_type
7✔
146
        for id_or_sym in instruments_store.all_id_and_syms:
7✔
147
            self._ins_id_or_sym_type_map[id_or_sym] = instrument_type
7✔
148
        self._instruments_stores[instrument_type] = instruments_store
7✔
149

150
    def register_dividend_store(self, instrument_type, dividend_store):
7✔
151
        # type: (INSTRUMENT_TYPE, AbstractDividendStore) -> None
152
        self._dividends[instrument_type] = dividend_store
×
153

154
    def register_split_store(self, instrument_type, split_store):
7✔
155
        self._split_factors[instrument_type] = split_store
×
156

157
    def register_calendar_store(self, calendar_type, calendar_store):
7✔
158
        # type: (TRADING_CALENDAR_TYPE, AbstractCalendarStore) -> None
159
        self._calendar_providers[calendar_type] = calendar_store
×
160

161
    def append_suspend_date_set(self, date_set):
7✔
162
        # type: (AbstractDateSet) -> None
163
        self._suspend_days.append(date_set)
×
164

165
    @lru_cache(2048)
7✔
166
    def get_dividend(self, instrument):
6✔
167
        try:
7✔
168
            dividend_store = self._dividends[instrument.type]
7✔
169
        except KeyError:
7✔
170
            return None
7✔
171

172
        return dividend_store.get_dividend(instrument.order_book_id)
7✔
173

174
    def get_trading_minutes_for(self, order_book_id, trading_dt):
7✔
175
        raise NotImplementedError
×
176

177
    def get_trading_calendars(self):
7✔
178
        # type: () -> Dict[TRADING_CALENDAR_TYPE, pd.DatetimeIndex]
179
        return {t: store.get_trading_calendar() for t, store in self._calendar_providers.items()}
7✔
180

181
    def get_instruments(self, id_or_syms=None, types=None):
7✔
182
        # type: (Optional[Iterable[str]], Optional[Iterable[INSTRUMENT_TYPE]]) -> Iterable[Instrument]
183
        if id_or_syms is not None:
7✔
184
            ins_type_getter = lambda i: self._ins_id_or_sym_type_map.get(i)
7✔
185
            type_id_iter = groupby(sorted(id_or_syms, key=ins_type_getter), key=ins_type_getter)
7✔
186
        else:
187
            type_id_iter = ((t, None) for t in types or self._instruments_stores.keys())
7✔
188
        for ins_type, id_or_syms in type_id_iter:
7✔
189
            if ins_type is not None and ins_type in self._instruments_stores:
7✔
190
                yield from self._instruments_stores[ins_type].get_instruments(id_or_syms)
7✔
191

192
    def get_share_transformation(self, order_book_id):
7✔
193
        return self._share_transformation.get_share_transformation(order_book_id)
7✔
194

195
    def is_suspended(self, order_book_id, dates):
7✔
196
        # type: (str, Sequence[DateLike]) -> List[bool]
197
        for date_set in self._suspend_days:
7✔
198
            result = date_set.contains(order_book_id, dates)
7✔
199
            if result is not None:
7✔
200
                return result
7✔
201
        else:
202
            return [False] * len(dates)
7✔
203

204
    def is_st_stock(self, order_book_id, dates):
7✔
205
        result = self._st_stock_days.contains(order_book_id, dates)
7✔
206
        return result if result is not None else [False] * len(dates)
7✔
207

208
    @lru_cache(None)
7✔
209
    def _all_day_bars_of(self, instrument):
6✔
210
        return self._day_bars[instrument.type].get_bars(instrument.order_book_id)
7✔
211

212
    @lru_cache(None)
7✔
213
    def _filtered_day_bars(self, instrument):
6✔
214
        bars = self._all_day_bars_of(instrument)
7✔
215
        return bars[bars['volume'] > 0]
7✔
216

217
    def get_bar(self, instrument, dt, frequency):
7✔
218
        # type: (Instrument, Union[datetime, date], str) -> Optional[np.ndarray]
219
        if frequency != '1d':
7✔
220
            raise NotImplementedError
×
221

222
        bars = self._all_day_bars_of(instrument)
7✔
223
        if len(bars) <= 0:
7✔
224
            return
×
225
        dt = np.uint64(convert_date_to_int(dt))
7✔
226
        pos = bars['datetime'].searchsorted(dt)
7✔
227
        if pos >= len(bars) or bars['datetime'][pos] != dt:
7✔
228
            return None
×
229

230
        return bars[pos]
7✔
231

232
    OPEN_AUCTION_BAR_FIELDS = ["datetime", "open", "limit_up", "limit_down", "volume", "total_turnover"]
7✔
233

234
    def get_open_auction_bar(self, instrument, dt):
7✔
235
        # type: (Instrument, Union[datetime, date]) -> Dict
236
        day_bar = self.get_bar(instrument, dt, "1d")
7✔
237
        if day_bar is None:
7✔
238
            bar = dict.fromkeys(self.OPEN_AUCTION_BAR_FIELDS, np.nan)
×
239
        else:
240
            bar = {k: day_bar[k] if k in day_bar.dtype.names else np.nan for k in self.OPEN_AUCTION_BAR_FIELDS}
7✔
241
        bar["last"] = bar["open"]
7✔
242
        return bar
7✔
243

244
    def get_settle_price(self, instrument, date):
7✔
245
        bar = self.get_bar(instrument, date, '1d')
7✔
246
        if bar is None:
7✔
247
            return np.nan
×
248
        return bar['settlement']
7✔
249

250
    @staticmethod
7✔
251
    def _are_fields_valid(fields, valid_fields):
6✔
252
        if fields is None:
7✔
253
            return True
×
254
        if isinstance(fields, six.string_types):
7✔
255
            return fields in valid_fields
7✔
256
        for field in fields:
7✔
257
            if field not in valid_fields:
7✔
258
                return False
×
259
        return True
7✔
260

261
    def get_ex_cum_factor(self, order_book_id):
7✔
262
        return self._ex_cum_factor.get_factors(order_book_id)
7✔
263

264
    def _update_weekly_trading_date_index(self, idx):
7✔
265
        env = Environment.get_instance()
×
266
        if env.data_proxy.is_trading_date(idx):
×
267
            return idx
×
268
        return env.data_proxy.get_previous_trading_date(idx)
×
269

270
    def resample_week_bars(self, bars, bar_count, fields):
7✔
271
        df_bars = pd.DataFrame(bars)
×
272
        df_bars['datetime'] = df_bars.apply(lambda x: convert_int_to_datetime(x['datetime']), axis=1)
×
273
        df_bars = df_bars.set_index('datetime')
×
274
        nead_fields = fields
×
275
        if isinstance(nead_fields, str):
×
276
            nead_fields = [nead_fields]
×
277
        hows = {field: BAR_RESAMPLE_FIELD_METHODS[field] for field in nead_fields if field in BAR_RESAMPLE_FIELD_METHODS}
×
278
        df_bars = df_bars.resample('W-Fri').agg(hows)
×
279
        df_bars.index = df_bars.index.map(self._update_weekly_trading_date_index)
×
280
        df_bars = df_bars[~df_bars.index.duplicated(keep='first')]
×
281
        df_bars.sort_index(inplace=True)
×
282
        df_bars = df_bars[-bar_count:]
×
283
        df_bars = df_bars.reset_index()
×
284
        df_bars['datetime'] = df_bars.apply(lambda x: np.uint64(convert_date_to_int(x['datetime'].date())), axis=1)
×
285
        df_bars = df_bars.set_index('datetime')
×
286
        bars = df_bars.to_records()
×
287
        return bars
×
288

289
    def history_bars(self, instrument, bar_count, frequency, fields, dt,
7✔
290
                     skip_suspended=True, include_now=False,
291
                     adjust_type='pre', adjust_orig=None):
292

293
        if frequency != '1d' and frequency != '1w':
7✔
294
            raise NotImplementedError
×
295

296
        if skip_suspended and instrument.type == 'CS':
7✔
297
            bars = self._filtered_day_bars(instrument)
7✔
298
        else:
299
            bars = self._all_day_bars_of(instrument)
7✔
300

301
        if not self._are_fields_valid(fields, bars.dtype.names):
7✔
302
            raise RQInvalidArgument("invalid fields: {}".format(fields))
×
303

304
        if len(bars) <= 0:
7✔
305
            return bars
7✔
306

307
        if frequency == '1w':
7✔
308
            if include_now:
×
309
                dt = np.uint64(convert_date_to_int(dt))
×
310
                i = bars['datetime'].searchsorted(dt, side='right')
×
311
            else:
312
                monday = dt - timedelta(days=dt.weekday())
×
313
                monday = np.uint64(convert_date_to_int(monday))
×
314
                i = bars['datetime'].searchsorted(monday, side='left')
×
315

316
            left = i - bar_count * 5 if i >= bar_count * 5 else 0
×
317
            bars = bars[left:i]
×
318

319
            if adjust_type == 'none' or instrument.type in {'Future', 'INDX'}:
×
320
                # 期货及指数无需复权
321
                week_bars = self.resample_week_bars(bars, bar_count, fields)
×
322
                return week_bars if fields is None else week_bars[fields]
×
323

324
            if isinstance(fields, str) and fields not in FIELDS_REQUIRE_ADJUSTMENT:
×
325
                week_bars = self.resample_week_bars(bars, bar_count, fields)
×
326
                return week_bars if fields is None else week_bars[fields]
×
327

328
            adjust_bars_date = adjust_bars(bars, self.get_ex_cum_factor(instrument.order_book_id),
×
329
                                           fields, adjust_type, adjust_orig)
330
            adjust_week_bars = self.resample_week_bars(adjust_bars_date, bar_count, fields)
×
331
            return adjust_week_bars if fields is None else adjust_week_bars[fields]
×
332
        dt = np.uint64(convert_date_to_int(dt))
7✔
333
        i = bars['datetime'].searchsorted(dt, side='right')
7✔
334
        left = i - bar_count if i >= bar_count else 0
7✔
335
        bars = bars[left:i]
7✔
336
        if adjust_type == 'none' or instrument.type in {'Future', 'INDX'}:
7✔
337
            # 期货及指数无需复权
338
            return bars if fields is None else bars[fields]
7✔
339

340
        if isinstance(fields, str) and fields not in FIELDS_REQUIRE_ADJUSTMENT:
7✔
341
            return bars if fields is None else bars[fields]
×
342

343
        bars = adjust_bars(bars, self.get_ex_cum_factor(instrument.order_book_id),
7✔
344
                           fields, adjust_type, adjust_orig)
345

346
        return bars if fields is None else bars[fields]
7✔
347

348
    def current_snapshot(self, instrument, frequency, dt):
7✔
349
        raise NotImplementedError
×
350

351
    @lru_cache(2048)
7✔
352
    def get_split(self, instrument):
6✔
353
        try:
7✔
354
            splilt_store = self._split_factors[instrument.type]
7✔
355
        except KeyError:
7✔
356
            return None
7✔
357

358
        return splilt_store.get_factors(instrument.order_book_id)
7✔
359

360
    def available_data_range(self, frequency):
7✔
361
        # FIXME
362
        from rqalpha.const import DEFAULT_ACCOUNT_TYPE
7✔
363
        accounts = Environment.get_instance().config.base.accounts
7✔
364
        if not (DEFAULT_ACCOUNT_TYPE.STOCK in accounts or DEFAULT_ACCOUNT_TYPE.FUTURE in accounts):
7✔
365
            return date.min, date.max
×
366
        if frequency in ['tick', '1d']:
7✔
367
            s, e = self._day_bars[INSTRUMENT_TYPE.INDX].get_date_range('000001.XSHG')
7✔
368
            return convert_int_to_date(s).date(), convert_int_to_date(e).date()
7✔
369

370
    def get_yield_curve(self, start_date, end_date, tenor=None):
7✔
371
        return self._yield_curve.get_yield_curve(start_date, end_date, tenor=tenor)
7✔
372

373
    @lru_cache(1024)
7✔
374
    def get_futures_trading_parameters(self, instrument: Instrument, dt: datetime.date) -> FuturesTradingParameters:
7✔
375
        return self._future_info_store.get_future_info(instrument.order_book_id, instrument.underlying_symbol)
7✔
376

377
    def get_merge_ticks(self, order_book_id_list, trading_date, last_dt=None):
7✔
378
        raise NotImplementedError
×
379

380
    def history_ticks(self, instrument, count, dt):
7✔
381
        raise NotImplementedError
×
382

383
    def get_algo_bar(self, id_or_ins, start_min, end_min, dt):
7✔
384
        raise NotImplementedError("open source rqalpha not support algo order")
×
385

386
    def get_open_auction_volume(self, instrument, dt):
7✔
387
        # type: (Instrument, datetime.datetime) -> float
388
        volume = self.get_open_auction_bar(instrument, dt)['volume']
7✔
389
        return volume
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc