• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ricequant / rqalpha / 21350729672

26 Jan 2026 08:17AM UTC coverage: 67.675%. First build
21350729672

Pull #961

github

web-flow
exclude tests (#965)

* exclude tests

* update

---------

Co-authored-by: Don <lin.dongzhao@ricequant.com>
Pull Request #961: Develop

501 of 585 new or added lines in 26 files covered. (85.64%)

7428 of 10976 relevant lines covered (67.67%)

5.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.47
/rqalpha/data/data_proxy.py
1
# -*- coding: utf-8 -*-
2
# 版权所有 2019 深圳米筐科技有限公司(下称“米筐科技”)
3
#
4
# 除非遵守当前许可,否则不得使用本软件。
5
#
6
#     * 非商业用途(非商业用途指个人出于非商业目的使用本软件,或者高校、研究所等非营利机构出于教育、科研等目的使用本软件):
7
#         遵守 Apache License 2.0(下称“Apache 2.0 许可”),
8
#         您可以在以下位置获得 Apache 2.0 许可的副本:http://www.apache.org/licenses/LICENSE-2.0。
9
#         除非法律有要求或以书面形式达成协议,否则本软件分发时需保持当前许可“原样”不变,且不得附加任何条件。
10
#
11
#     * 商业用途(商业用途指个人出于任何商业目的使用本软件,或者法人或其他组织出于任何目的使用本软件):
12
#         未经米筐科技授权,任何个人不得出于任何商业目的使用本软件(包括但不限于向第三方提供、销售、出租、出借、转让本软件、
13
#         本软件的衍生产品、引用或借鉴了本软件功能或源代码的产品或服务),任何法人或其他组织不得出于任何目的使用本软件,
14
#         否则米筐科技有权追究相应的知识产权侵权责任。
15
#         在此前提下,对本软件的使用同样需要遵守 Apache 2.0 许可,Apache 2.0 许可与本许可冲突之处,以本许可为准。
16
#         详细的授权流程,请联系 public@ricequant.com 获取。
17

18
from datetime import datetime, date
8✔
19
from typing import Union, List, Sequence, Optional, Tuple, Iterable, Dict, Callable
8✔
20

21
import numpy as np
8✔
22
import pandas as pd
8✔
23

24
from rqalpha.const import INSTRUMENT_TYPE, EXECUTION_PHASE, MARKET
8✔
25
from rqalpha.utils import risk_free_helper, TimeRange, merge_trading_period
8✔
26
from rqalpha.data.trading_dates_mixin import TradingDatesMixin
8✔
27
from rqalpha.model.bar import BarObject, NANDict, PartialBarObject
8✔
28
from rqalpha.model.tick import TickObject
8✔
29
from rqalpha.model.instrument import Instrument
8✔
30
from rqalpha.model.order import ALGO_ORDER_STYLES
8✔
31
from rqalpha.utils.functools import lru_cache
8✔
32
from rqalpha.utils.datetime_func import convert_int_to_datetime
8✔
33
from rqalpha.utils.typing import DateLike, StrOrIter
8✔
34
from rqalpha.utils.i18n import gettext as _
8✔
35
from rqalpha.interface import AbstractDataSource, AbstractPriceBoard, ExchangeRate
8✔
36
from rqalpha.core.execution_context import ExecutionContext
8✔
37
from rqalpha.utils.typing import DateLike
8✔
38
from rqalpha.utils.exception import InstrumentNotFound
8✔
39

40
from .instruments_mixin import InstrumentsMixin
8✔
41

42
class DataProxy(TradingDatesMixin, InstrumentsMixin):
8✔
43
    def __init__(self, data_source: AbstractDataSource, price_board: AbstractPriceBoard):
8✔
44
        self._data_source = data_source
8✔
45
        self._price_board = price_board
8✔
46
        TradingDatesMixin.__init__(self, data_source)
8✔
47
        InstrumentsMixin.__init__(self, data_source)
8✔
48

49
    def __getattr__(self, item):
8✔
50
        return getattr(self._data_source, item)
8✔
51

52
    def get_trading_minutes_for(self, order_book_id, dt):
8✔
53
        instrument = self.instruments(order_book_id)
×
54
        minutes = self._data_source.get_trading_minutes_for(instrument, dt)
×
55
        return [] if minutes is None else minutes
×
56

57
    def get_yield_curve(self, start_date, end_date, tenor=None):
8✔
58
        if isinstance(tenor, str):
8✔
59
            tenor = [tenor]
×
60
        return self._data_source.get_yield_curve(start_date, end_date, tenor)
8✔
61

62
    def get_risk_free_rate(self, start_date, end_date):
8✔
63
        tenors = risk_free_helper.get_tenors_for(start_date, end_date)
8✔
64
        # 为何取 start_date 当日的?表示 start_date 时借入资金、end_date 归还的成本
65
        _s = start_date if self.is_trading_date(start_date) else self.get_next_trading_date(start_date, n=1)
8✔
66
        yc = self._data_source.get_yield_curve(_s, _s)
8✔
67
        if yc is None or yc.empty:
8✔
68
            return np.nan
×
69
        yc = yc.iloc[0]
8✔
70
        for tenor in tenors[::-1]:
8✔
71
            rate = yc.get(tenor)
8✔
72
            if rate and not np.isnan(rate):
8✔
73
                return rate
8✔
74
        else:
75
            return np.nan
×
76

77
    def get_dividend(self, order_book_id: str) -> Optional[np.ndarray]:
8✔
78
        """
79
        获取股票/基金分红信息
80

81
        :param str order_book_id: 合约代码
82
        
83
        :return: `numpy.ndarray` | `None`
84
            返回分红信息的结构化数组,包含以下字段:
85
            
86
            =========================   ===================================================
87
            字段名                       描述  
88
            =========================   ===================================================
89
            book_closure_date           股权登记日,格式为 YYYYMMDD 的整数
90
            announcement_date           公告日期,格式为 YYYYMMDD 的整数  
91
            dividend_cash_before_tax    税前现金分红,单位为元
92
            ex_dividend_date            除权除息日,格式为 YYYYMMDD 的整数
93
            payable_date                分红派息日,格式为 YYYYMMDD 的整数
94
            round_lot                   分红最小单位,例如:10 代表每 10 股派发
95
            =========================   ===================================================
96
            
97
            如果该合约没有分红记录,则返回 None
98
        """
99
        instrument = self.instrument_not_none(order_book_id)
8✔
100
        return self._data_source.get_dividend(instrument)
8✔
101

102
    def get_split(self, order_book_id: str) -> Optional[np.ndarray]:
8✔
103
        """
104
        获取股票拆股信息
105

106
        :param str order_book_id: 合约代码
107
        
108
        :return: `numpy.ndarray` | `None`
109
            返回拆股信息的结构化数组,包含以下字段:
110
            
111
            =========================   ===================================================
112
            字段名                       描述  
113
            =========================   ===================================================
114
            ex_date                     除权日,格式为 YYYYMMDDHHMMSS 的整数(时分秒通常为000000)
115
            split_factor                拆股比例,例如:1.5 表示每股拆为 1.5 股
116
            =========================   ===================================================
117
            
118
            如果该合约没有拆股记录,则返回 None
119
        """
120
        instrument = self.instrument_not_none(order_book_id)
8✔
121
        return self._data_source.get_split(instrument)
8✔
122

123
    @lru_cache(10240)
8✔
124
    def _get_prev_close(self, order_book_id, dt, adjust_type: str = "pre"):
8✔
125
        instrument = self.instrument_not_none(order_book_id)
8✔
126
        prev_trading_date = self.get_previous_trading_date(dt)
8✔
127
        bar = self._data_source.history_bars(instrument, 1, '1d', 'close', prev_trading_date,
8✔
128
                                             skip_suspended=False, include_now=False, adjust_type=adjust_type, adjust_orig=dt)
129
        if bar is None or len(bar) < 1:
8✔
130
            return np.nan
×
131
        return bar[0]
8✔
132

133
    def get_prev_close(self, order_book_id, dt, adjust_type: str = "pre"):
8✔
134
        # 获取(基于当日前复权过的)昨收价
135
        return self._get_prev_close(order_book_id, dt.replace(hour=0, minute=0, second=0))
8✔
136

137
    @lru_cache(10240)
8✔
138
    def _get_prev_settlement(self, instrument, dt):
7✔
139
        bar = self._data_source.history_bars(
8✔
140
            instrument, 1, '1d', fields='prev_settlement', dt=dt, skip_suspended=False, adjust_orig=dt
141
        )
142
        if bar is None or len(bar) == 0:
8✔
143
            return np.nan
×
144
        return bar[0]
8✔
145

146
    @lru_cache(10240)
8✔
147
    def _get_settlement(self, instrument, dt):
7✔
148
        bar = self._data_source.history_bars(instrument, 1, '1d', 'settlement', dt, skip_suspended=False)
×
149
        if bar is None or len(bar) == 0:
×
150
            raise LookupError("'{}', dt={}".format(instrument.order_book_id, dt))
×
151
        return bar[0]
×
152

153
    def get_prev_settlement(self, order_book_id, dt):
8✔
154
        instrument = self.instruments(order_book_id)
8✔
155
        if instrument.type not in (INSTRUMENT_TYPE.FUTURE, INSTRUMENT_TYPE.OPTION):
8✔
156
            return np.nan
×
157
        return self._get_prev_settlement(instrument, dt)
8✔
158

159
    def get_settlement(self, instrument: Instrument, dt: datetime) -> float:
8✔
160
        if instrument.type != INSTRUMENT_TYPE.FUTURE:
×
161
            raise LookupError("'{}', instrument_type={}".format(instrument.order_book_id, instrument.type))
×
162
        return self._get_settlement(instrument, dt)
×
163

164
    def get_settle_price(self, order_book_id, trading_dt: datetime):
8✔
165
        instrument = self.get_active_instrument(order_book_id, trading_dt)
8✔
166
        if instrument.type != 'Future':
8✔
167
            return np.nan
×
168
        return self._data_source.get_settle_price(instrument, trading_dt)
8✔
169

170
    @lru_cache(512)
8✔
171
    def get_bar(self, order_book_id: str, dt: date, frequency: str = '1d') -> BarObject:
8✔
172
        instrument = self.instruments(order_book_id)
8✔
173
        if dt is None:
8✔
174
            return BarObject(instrument, NANDict, dt)
×
175
        bar = self._data_source.get_bar(instrument, dt, frequency)
8✔
176
        if bar:
8✔
177
            return BarObject(instrument, bar)
8✔
178
        return BarObject(instrument, NANDict, dt)
×
179

180
    def get_open_auction_bar(self, order_book_id: str, dt):
8✔
181
        instrument = self.instruments(order_book_id)
8✔
182
        try:
8✔
183
            bar = self._data_source.get_open_auction_bar(instrument, dt)
8✔
184
        except NotImplementedError:
×
185
            # forward compatible
186
            tick = self.current_snapshot(order_book_id, "1d", dt)
×
187
            bar = {k: getattr(tick, k) for k in [
×
188
                "datetime", "open", "limit_up", "limit_down", "volume", "total_turnover"
189
            ]}
190
        return PartialBarObject(instrument, bar)
8✔
191
    
192
    def get_open_auction_volume(self, order_book_id, dt):
8✔
193
        instrument = self.instruments(order_book_id)
8✔
194
        volume = self._data_source.get_open_auction_volume(instrument, dt)
8✔
195
        return volume
8✔
196

197
    def history(self, order_book_id, bar_count, frequency, field, dt):
8✔
198
        data = self.history_bars(order_book_id, bar_count, frequency,
×
199
                                 ['datetime', field], dt, skip_suspended=False, adjust_orig=dt)
200
        if data is None:
×
201
            return None
×
202
        return pd.Series(data[field], index=[convert_int_to_datetime(t) for t in data['datetime']])
×
203

204
    def fast_history(self, order_book_id, bar_count, frequency, field, dt):
8✔
205
        return self.history_bars(order_book_id, bar_count, frequency, field, dt, skip_suspended=False,
×
206
                                 adjust_type='pre', adjust_orig=dt)
207

208
    def history_bars(
8✔
209
        self,
210
        order_book_id: str,
211
        bar_count: Optional[int],
212
        frequency: str, 
213
        field: Union[str, List[str], None], 
214
        dt: datetime,
215
        skip_suspended: bool = True, 
216
        include_now: bool = False, 
217
        adjust_type: str = 'pre', 
218
        adjust_orig: Optional[datetime] = None
219
    ):
220
        instruments = self.get_instrument_history(order_book_id, dt)
8✔
221
        if len(instruments) == 0:
8✔
NEW
222
            raise InstrumentNotFound(_("No instrument found at {dt}: {id_or_sym}").format(dt=dt, id_or_sym=order_book_id))
×
223
        instrument = instruments[-1]
8✔
224
        if adjust_orig is None:
8✔
225
            adjust_orig = dt
8✔
226
        return self._data_source.history_bars(instrument, bar_count, frequency, field, dt,
8✔
227
                                              skip_suspended=skip_suspended, include_now=include_now,
228
                                              adjust_type=adjust_type, adjust_orig=adjust_orig)
229

230
    def history_ticks(self, order_book_id, count, dt):
8✔
231
        instrument = self.instrument_not_none(order_book_id)
×
232
        return self._data_source.history_ticks(instrument, count, dt)
×
233

234
    def current_snapshot(self, order_book_id, frequency, dt):
8✔
235

236
        def tick_fields_for(ins):
8✔
237
            _STOCK_FIELD_NAMES = [
8✔
238
                'datetime', 'open', 'high', 'low', 'last', 'volume', 'total_turnover', 'prev_close',
239
                'limit_up', 'limit_down'
240
            ]
241
            _FUTURE_FIELD_NAMES = _STOCK_FIELD_NAMES + ['open_interest', 'prev_settlement']
8✔
242

243
            if ins.type not in [INSTRUMENT_TYPE.FUTURE, INSTRUMENT_TYPE.OPTION]:
8✔
244
                return _STOCK_FIELD_NAMES
8✔
245
            else:
246
                return _FUTURE_FIELD_NAMES
×
247

248
        instrument = self.instruments(order_book_id)
8✔
249
        if frequency == '1d':
8✔
250
            bar = self._data_source.get_bar(instrument, dt, '1d')
8✔
251
            if not bar:
8✔
252
                return None
×
253
            d = {k: bar[k] for k in tick_fields_for(instrument) if k in bar.dtype.names}
8✔
254
            d["last"] = bar["open"] if ExecutionContext.phase() == EXECUTION_PHASE.OPEN_AUCTION else bar["close"]
8✔
255
            d['prev_close'] = self._get_prev_close(order_book_id, dt)
8✔
256
            return TickObject(instrument, d)
8✔
257

258
        return self._data_source.current_snapshot(instrument, frequency, dt)
×
259

260
    def available_data_range(self, frequency):
8✔
261
        return self._data_source.available_data_range(frequency)
8✔
262

263
    def get_futures_trading_parameters(self, order_book_id, dt):
8✔
264
        # type: (str, datetime.date) -> FuturesTradingParameters
265
        instrument = self.instruments(order_book_id)
8✔
266
        return self._data_source.get_futures_trading_parameters(instrument, dt)
8✔
267

268
    def get_merge_ticks(self, order_book_id_list, trading_date, last_dt=None):
8✔
269
        return self._data_source.get_merge_ticks(order_book_id_list, trading_date, last_dt)
×
270

271
    def is_suspended(self, order_book_id: str, dt: DateLike, count: int = 1) -> Union[bool, List[bool]]:
8✔
272
        if count == 1:
8✔
273
            return self._data_source.is_suspended(order_book_id, [dt])[0]
8✔
274

275
        trading_dates = self.get_n_trading_dates_until(dt, count)
×
276
        return self._data_source.is_suspended(order_book_id, trading_dates)
×
277

278
    def is_st_stock(self, order_book_id: str, dt: DateLike, count: int = 1) -> Union[bool, List[bool]]:
8✔
279
        if count == 1:
8✔
280
            return self._data_source.is_st_stock(order_book_id, [dt])[0]
×
281

282
        trading_dates = self.get_n_trading_dates_until(dt, count)
8✔
283
        return self._data_source.is_st_stock(order_book_id, trading_dates)
8✔
284

285
    def get_tick_size(self, order_book_id):
8✔
286
        return self.instruments(order_book_id).tick_size()
×
287

288
    def get_last_price(self, order_book_id: str) -> float:
8✔
289
        return float(self._price_board.get_last_price(order_book_id))
8✔
290

291
    def get_future_contracts(self, underlying, date):
8✔
292
        # type: (str, DateLike) -> List[str]
293
        return sorted(i.order_book_id for i in self.all_instruments(
×
294
            [INSTRUMENT_TYPE.FUTURE], date
295
        ) if i.underlying_symbol == underlying and not Instrument.is_future_continuous_contract(i.order_book_id))
296

297
    def get_trading_period(self, sym_or_ids, default_trading_period=None):
8✔
298
        # type: (StrOrIter, Optional[Sequence[TimeRange]]) -> List[TimeRange]
299
        trading_period = default_trading_period or []
8✔
300
        for instrument in self.instruments(sym_or_ids):
8✔
301
            trading_period.extend(instrument.trading_hours or [])
8✔
302
        return merge_trading_period(trading_period)
8✔
303

304
    def is_night_trading(self, sym_or_ids):
8✔
305
        # type: (StrOrIter) -> bool
306
        return any((instrument.trade_at_night for instrument in self.instruments(sym_or_ids)))
8✔
307

308
    def get_algo_bar(self, id_or_ins, order_style, dt):
8✔
309
        # type: (Union[str, Instrument], Union[*ALGO_ORDER_STYLES], datetime) -> Tuple[float, int]
310
        if not isinstance(order_style, ALGO_ORDER_STYLES):
×
311
            raise RuntimeError("get_algo_bar only support VWAPOrder and TWAPOrder")
×
312
        if not isinstance(id_or_ins, Instrument):
×
313
            id_or_ins = self.instrument_not_none(id_or_ins)
×
314
        # 存在一些有日线没分钟线的情况,如果不是缺了,通常都是因为volume为0,用日线先判断确认下
315
        day_bar = self.get_bar(order_book_id=id_or_ins.order_book_id, dt=dt, frequency="1d")
×
316
        if day_bar.volume == 0:
×
317
            return np.nan, 0
×
318
        bar = self._data_source.get_algo_bar(id_or_ins, order_style.start_min, order_style.end_min, dt)
×
319
        return (bar[order_style.TYPE], bar["volume"]) if bar else (np.nan, 0)
×
320

321
    def get_exchange_rate(self, date: date, local: MARKET, settlement: MARKET = MARKET.CN) -> ExchangeRate:
8✔
322
        return self._data_source.get_exchange_rate(date, local, settlement)
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc