• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ricequant / rqalpha / 19357934277

14 Nov 2025 07:48AM UTC coverage: 65.172% (-0.1%) from 65.319%
19357934277

push

github

web-flow
Merge pull request #948 from ricequant/develop

Develop

3 of 5 new or added lines in 2 files covered. (60.0%)

20 existing lines in 2 files now uncovered.

6787 of 10414 relevant lines covered (65.17%)

4.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.42
/rqalpha/data/bundle.py
1
# -*- coding: utf-8 -*-
2
# 版权所有 2019 深圳米筐科技有限公司(下称“米筐科技”)
3
#
4
# 除非遵守当前许可,否则不得使用本软件。
5
#
6
#     * 非商业用途(非商业用途指个人出于非商业目的使用本软件,或者高校、研究所等非营利机构出于教育、科研等目的使用本软件):
7
#         遵守 Apache License 2.0(下称“Apache 2.0 许可”),您可以在以下位置获得 Apache 2.0 许可的副本:
8
#         http://www.apache.org/licenses/LICENSE-2.0。
9
#         除非法律有要求或以书面形式达成协议,否则本软件分发时需保持当前许可“原样”不变,且不得附加任何条件。
10
#
11
#     * 商业用途(商业用途指个人出于任何商业目的使用本软件,或者法人或其他组织出于任何目的使用本软件):
12
#         未经米筐科技授权,任何个人不得出于任何商业目的使用本软件(包括但不限于向第三方提供、销售、出租、出借、转让本软件、本软件的衍生产品、引用或借鉴了本软件功能或源代码的产品或服务),任何法人或其他组织不得出于任何目的使用本软件,否则米筐科技有权追究相应的知识产权侵权责任。
13
#         在此前提下,对本软件的使用同样需要遵守 Apache 2.0 许可,Apache 2.0 许可与本许可冲突之处,以本许可为准。
14
#         详细的授权流程,请联系 public@ricequant.com 获取。
15
import datetime
7✔
16
import json
7✔
17
import os
7✔
18
import pickle
7✔
19
import re
7✔
20
from itertools import chain
7✔
21
from typing import Callable, Optional, Union, List
7✔
22
from filelock import FileLock, Timeout
7✔
23
import multiprocessing
7✔
24
from multiprocessing.sharedctypes import Synchronized
7✔
25
from ctypes import c_bool
7✔
26

27
import h5py
7✔
28
import numpy as np
7✔
29
from rqalpha.apis.api_rqdatac import rqdatac
7✔
30
from rqalpha.utils.concurrent import ProgressedProcessPoolExecutor, ProgressedTask
7✔
31
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
7✔
32
from rqalpha.utils.i18n import gettext as _
7✔
33
from rqalpha.utils.functools import lru_cache
7✔
34
from rqalpha.utils.logger import init_logger, system_log
7✔
35
from rqalpha.environment import Environment
7✔
36
from rqalpha.model.instrument import Instrument
7✔
37

38

39
START_DATE = 20050104
7✔
40
END_DATE = 29991231
7✔
41

42

43
def gen_instruments(d):
7✔
44
    stocks = sorted(list(rqdatac.all_instruments().order_book_id))
×
45
    instruments = [i.__dict__ for i in rqdatac.instruments(stocks)]
×
46
    with open(os.path.join(d, 'instruments.pk'), 'wb') as out:
×
47
        pickle.dump(instruments, out, protocol=2)
×
48

49

50
def gen_yield_curve(d):
7✔
51
    yield_curve = rqdatac.get_yield_curve(start_date=START_DATE, end_date=datetime.date.today())
×
52
    yield_curve.index = [convert_date_to_date_int(d) for d in yield_curve.index]
×
53
    yield_curve.index.name = 'date'
×
54
    with h5py.File(os.path.join(d, 'yield_curve.h5'), 'w') as f:
×
55
        f.create_dataset('data', data=yield_curve.to_records())
×
56

57

58
def gen_trading_dates(d):
7✔
59
    dates = rqdatac.get_trading_dates(start_date=START_DATE, end_date='2999-01-01')
×
60
    dates = np.array([convert_date_to_date_int(d) for d in dates])
×
61
    np.save(os.path.join(d, 'trading_dates.npy'), dates, allow_pickle=False)
×
62

63

64
def gen_st_days(d):
7✔
65
    from rqdatac.client import get_client
×
66
    stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
×
67
    st_days = get_client().execute('get_st_days', stocks, START_DATE,
×
68
                                   convert_date_to_date_int(datetime.date.today()))
69
    with h5py.File(os.path.join(d, 'st_stock_days.h5'), 'w') as h5:
×
70
        for order_book_id, days in st_days.items():
×
71
            h5[order_book_id] = days
×
72

73

74
def gen_suspended_days(d):
7✔
75
    from rqdatac.client import get_client
×
76
    stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
×
77
    suspended_days = get_client().execute('get_suspended_days', stocks, START_DATE,
×
78
                                          convert_date_to_date_int(datetime.date.today()))
79
    with h5py.File(os.path.join(d, 'suspended_days.h5'), 'w') as h5:
×
80
        for order_book_id, days in suspended_days.items():
×
81
            h5[order_book_id] = days
×
82

83

84
def gen_dividends(d):
7✔
85
    stocks = rqdatac.all_instruments().order_book_id.tolist()
×
86
    dividend = rqdatac.get_dividend(stocks)
×
87
    need_cols = ["dividend_cash_before_tax", "book_closure_date", "ex_dividend_date", "payable_date", "round_lot"]
×
88
    dividend = dividend[need_cols]
×
89
    dividend.reset_index(inplace=True)
×
90
    dividend.rename(columns={'declaration_announcement_date': 'announcement_date'}, inplace=True)
×
91
    for f in ('book_closure_date', 'ex_dividend_date', 'payable_date', 'announcement_date'):
×
92
        dividend[f] = [convert_date_to_date_int(d) for d in dividend[f]]
×
93
    dividend.set_index(['order_book_id', 'book_closure_date'], inplace=True)
×
94
    with h5py.File(os.path.join(d, 'dividends.h5'), 'w') as h5:
×
95
        for order_book_id in dividend.index.levels[0]:
×
96
            h5[order_book_id] = dividend.loc[order_book_id].to_records()
×
97

98

99
def gen_splits(d):
7✔
100
    stocks = rqdatac.all_instruments().order_book_id.tolist()
×
101
    split = rqdatac.get_split(stocks)
×
102
    split['split_factor'] = split['split_coefficient_to'] / split['split_coefficient_from']
×
103
    split = split[['split_factor']]
×
104
    split.reset_index(inplace=True)
×
105
    split.rename(columns={'ex_dividend_date': 'ex_date'}, inplace=True)
×
106
    split['ex_date'] = [convert_date_to_int(d) for d in split['ex_date']]
×
107
    split.set_index(['order_book_id', 'ex_date'], inplace=True)
×
108

109
    with h5py.File(os.path.join(d, 'split_factor.h5'), 'w') as h5:
×
110
        for order_book_id in split.index.levels[0]:
×
111
            h5[order_book_id] = split.loc[order_book_id].to_records()
×
112

113

114
def gen_ex_factor(d):
7✔
115
    stocks = rqdatac.all_instruments().order_book_id.tolist()
×
116
    ex_factor = rqdatac.get_ex_factor(stocks)
×
117
    ex_factor.reset_index(inplace=True)
×
118
    ex_factor['ex_date'] = [convert_date_to_int(d) for d in ex_factor['ex_date']]
×
119
    ex_factor.rename(columns={'ex_date': 'start_date'}, inplace=True)
×
120
    ex_factor.set_index(['order_book_id', 'start_date'], inplace=True)
×
121
    ex_factor = ex_factor[['ex_cum_factor']]
×
122

123
    dtype = ex_factor.loc[ex_factor.index.levels[0][0]].to_records().dtype
×
124
    initial = np.empty((1,), dtype=dtype)
×
125
    initial['start_date'] = 0
×
126
    initial['ex_cum_factor'] = 1.0
×
127

128
    with h5py.File(os.path.join(d, 'ex_cum_factor.h5'), 'w') as h5:
×
129
        for order_book_id in ex_factor.index.levels[0]:
×
130
            h5[order_book_id] = np.concatenate([initial, ex_factor.loc[order_book_id].to_records()])
×
131

132

133
def gen_share_transformation(d):
7✔
134
    df = rqdatac.get_share_transformation()
×
135
    df.drop_duplicates("predecessor", inplace=True)
×
136
    df.set_index('predecessor', inplace=True)
×
137
    df.effective_date = df.effective_date.astype(str)
×
138
    df.predecessor_delisted_date = df.predecessor_delisted_date.astype(str)
×
139

140
    json_file = os.path.join(d, 'share_transformation.json')
×
141
    with open(json_file, 'w') as f:
×
142
        f.write(df.to_json(orient='index'))
×
143

144

145
def gen_future_info(d):
7✔
146
    future_info_file = os.path.join(d, 'future_info.json')
×
147

148
    def _need_to_recreate():
×
149
        if not os.path.exists(future_info_file):
×
150
            return
×
151
        else:
152
            with open(future_info_file, "r") as f:
×
153
                all_futures_info = json.load(f)
×
154
                if "margin_rate" not in all_futures_info[0]:
×
155
                    return True
×
156
    
157
    def update_margin_rate(file):
×
158
        all_instruments_data = rqdatac.all_instruments("Future")
×
159
        with open(file, "r") as f:
×
160
            all_futures_info = json.load(f)
×
161
            new_all_futures_info = []
×
162
            for future_info in all_futures_info:
×
163
                if "order_book_id" in future_info:
×
164
                    future_info["margin_rate"] = all_instruments_data[all_instruments_data["order_book_id"] == future_info["order_book_id"]].iloc[0].margin_rate
×
165
                elif "underlying_symbol" in future_info:
×
166
                    dominant = rqdatac.futures.get_dominant(future_info["underlying_symbol"])[-1]
×
167
                    future_info["margin_rate"] = all_instruments_data[all_instruments_data["order_book_id"] == dominant].iloc[0].margin_rate
×
168
                new_all_futures_info.append(future_info)
×
169
        os.remove(file)
×
170
        with open(file, "w") as f:
×
171
            json.dump(new_all_futures_info, f, separators=(',', ':'), indent=2)
×
172

173
    if (_need_to_recreate()): update_margin_rate(future_info_file)
×
174

175
    # 新增 hard_code 的种类时,需要同时修改rqalpha.data.base_data_source.storages.FutureInfoStore.data_compatible中的内容
176
    hard_code = [
×
177
        {'underlying_symbol': 'TC',
178
          'close_commission_ratio': 4.0,
179
          'close_commission_today_ratio': 0.0,
180
          'commission_type': "by_volume",
181
          'open_commission_ratio': 4.0,
182
          'margin_rate': 0.05,
183
          'tick_size': 0.2},
184
         {'underlying_symbol': 'ER',
185
          'close_commission_ratio': 2.5,
186
          'close_commission_today_ratio': 2.5,
187
          'commission_type': "by_volume",
188
          'open_commission_ratio': 2.5,
189
          'margin_rate': 0.05,
190
          'tick_size': 1.0},
191
         {'underlying_symbol': 'WS',
192
          'close_commission_ratio': 2.5,
193
          'close_commission_today_ratio': 0.0,
194
          'commission_type': "by_volume",
195
          'open_commission_ratio': 2.5,
196
          'margin_rate': 0.05,
197
          'tick_size': 1.0},
198
         {'underlying_symbol': 'RO',
199
          'close_commission_ratio': 2.5,
200
          'close_commission_today_ratio': 0.0,
201
          'commission_type': "by_volume",
202
          'open_commission_ratio': 2.5,
203
          'margin_rate': 0.05,
204
          'tick_size': 2.0},
205
         {'underlying_symbol': 'ME',
206
          'close_commission_ratio': 1.4,
207
          'close_commission_today_ratio': 0.0,
208
          'commission_type': "by_volume",
209
          'open_commission_ratio': 1.4,
210
          'margin_rate': 0.06,
211
          'tick_size': 1.0},
212
        {'underlying_symbol': 'WT',
213
         'close_commission_ratio': 5.0,
214
         'close_commission_today_ratio': 5.0,
215
         'commission_type': "by_volume",
216
         'open_commission_ratio': 5.0,
217
         'margin_rate': 0.05,
218
         'tick_size': 1.0},
219
    ]
220

221
    if not os.path.exists(future_info_file):
×
222
        all_futures_info = hard_code
×
223
    else:
224
        with open(future_info_file, 'r') as f:
×
225
            all_futures_info = json.load(f)
×
226

227
    future_list = []
×
228
    symbol_list = []
×
229
    param = ['close_commission_ratio', 'close_commission_today_ratio', 'commission_type', 'open_commission_ratio']
×
230

231
    for i in all_futures_info:
×
232
        if i.get('order_book_id'):
×
233
            future_list.append(i.get('order_book_id'))
×
234
        else:
235
            symbol_list.append(i.get('underlying_symbol'))
×
236

237
    # 当修改了hard_code以后,避免用户需要手动删除future_info.json文件
238
    for info in hard_code:
×
239
        if info["underlying_symbol"] not in symbol_list:
×
240
            all_futures_info.append(info)
×
241
            symbol_list.append(info["underlying_symbol"])
×
242

243
    futures_order_book_id = rqdatac.all_instruments(type='Future')['order_book_id'].unique()
×
244
    commission_df = rqdatac.futures.get_commission_margin()
×
245
    for future in futures_order_book_id:
×
246
        underlying_symbol = re.match(r'^[a-zA-Z]*', future).group()
×
247
        if future in future_list:
×
248
            continue
×
249
        future_dict = {}
×
250
        commission = commission_df[commission_df['order_book_id'] == future]
×
251
        if not commission.empty:
×
252
            future_dict['order_book_id'] = future
×
253
            commission = commission.iloc[0]
×
254
            for p in param:
×
255
                future_dict[p] = commission[p]
×
256
            instruemnts_data = rqdatac.instruments(future)
×
257
            future_dict['margin_rate'] = instruemnts_data.margin_rate
×
258
            future_dict['tick_size'] = instruemnts_data.tick_size()
×
259
        elif underlying_symbol in symbol_list:
×
260
            continue
×
261
        else:
262
            symbol_list.append(underlying_symbol)
×
263
            future_dict['underlying_symbol'] = underlying_symbol
×
264
            try:
×
265
                dominant = rqdatac.futures.get_dominant(underlying_symbol).iloc[-1]
×
266
            except AttributeError:
×
267
                # FIXME: why get_dominant return None???
268
                continue
×
269
            
270
            dominant_indexer = commission_df["order_book_id"] == dominant
×
271
            if not dominant_indexer.any():
×
272
                # S0301:大豆期货的最后一个合约,该合约出现在 instrument 中,但取不到 commission,这种情况忽略掉
273
                continue
×
274
            commission = commission_df[dominant_indexer].iloc[0]
×
275

276
            for p in param:
×
277
                future_dict[p] = commission[p]
×
278
            instruemnts_data = rqdatac.instruments(dominant)
×
279
            future_dict['margin_rate'] = instruemnts_data.margin_rate
×
280
            future_dict['tick_size'] = instruemnts_data.tick_size()
×
281
        all_futures_info.append(future_dict)
×
282

283
    with open(os.path.join(d, 'future_info.json'), 'w') as f:
×
284
        json.dump(all_futures_info, f, separators=(',', ':'), indent=2)
×
285

286

287
class GenerateFileTask(ProgressedTask):
7✔
288
    def __init__(self, func):
7✔
289
        self._func = func
×
290
        self._step = 100
×
291

292
    @property
7✔
293
    def total_steps(self):
6✔
294
        # type: () -> int
295
        return self._step
×
296

297
    def __call__(self, *args, **kwargs):
7✔
298
        self._func(*args, **kwargs)
×
299
        yield self._step
×
300

301

302
STOCK_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'limit_up', 'limit_down', 'volume', 'total_turnover']
7✔
303
INDEX_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'volume', 'total_turnover']
7✔
304
FUTURES_FIELDS = STOCK_FIELDS + ['settlement', 'prev_settlement', 'open_interest']
7✔
305
FUND_FIELDS = STOCK_FIELDS
7✔
306

307

308
class DayBarTask(ProgressedTask):
7✔
309
    def __init__(self, order_book_ids):
7✔
310
        self._order_book_ids = order_book_ids
×
311

312
    @property
7✔
313
    def total_steps(self):
6✔
314
        # type: () -> int
315
        return len(self._order_book_ids)
×
316

317
    def __call__(self, path, fields, **kwargs):
7✔
318
        raise NotImplementedError
×
319

320

321
class GenerateDayBarTask(DayBarTask):
7✔
322
    def __call__(self, path, fields, **kwargs):
7✔
323
        try:
×
324
            h5 = h5py.File(path, "w")
×
325
        except OSError:
×
326
            system_log.error("File {} update failed, if it is using, please update later, "
×
327
                            "or you can delete then update again".format(path))
328
            sval.value = False
×
329
            yield 1
×
330
        else:
331
            with h5:
×
332
                i, step = 0, 300
×
333
                while True:
334
                    order_book_ids = self._order_book_ids[i:i + step]
×
335
                    df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
×
336
                                        adjust_type='none', fields=fields, expect_df=True)
337
                    if not (df is None or df.empty):
×
338
                        df.reset_index(inplace=True)
×
339
                        df['datetime'] = [convert_date_to_int(d) for d in df['date']]
×
340
                        del df['date']
×
341
                        df.set_index(['order_book_id', 'datetime'], inplace=True)
×
342
                        df.sort_index(inplace=True)
×
343
                        for order_book_id in df.index.levels[0]:
×
344
                            h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
×
345
                    i += step
×
346
                    yield len(order_book_ids)
×
347
                    if i >= len(self._order_book_ids):
×
348
                        break
×
349

350

351
class UpdateDayBarTask(DayBarTask):
7✔
352
    def h5_has_valid_fields(self, h5, wanted_fields):
7✔
353
        obid_gen = (k for k in h5.keys())
×
354
        wanted_fields = set(wanted_fields)
×
355
        wanted_fields.add('datetime')
×
356
        try:
×
357
            h5_fields = set(h5[next(obid_gen)].dtype.fields.keys())
×
358
        except StopIteration:
×
359
            pass
×
360
        else:
361
            return h5_fields == wanted_fields
×
362
        return False
×
363

364
    def __call__(self, path, fields, **kwargs):
7✔
365
        need_recreate_h5 = False
×
366
        try:
×
367
            with h5py.File(path, 'r') as h5:
×
368
                need_recreate_h5 = not self.h5_has_valid_fields(h5, fields)
×
369
        except (OSError, RuntimeError):
×
370
            need_recreate_h5 = True
×
371
        if need_recreate_h5:
×
372
            yield from GenerateDayBarTask(self._order_book_ids)(path, fields, **kwargs)
×
373
        else:
374
            h5 = None
×
375
            try:
×
376
                h5 = h5py.File(path, 'a')
×
377
            except OSError:
×
378
                system_log.error("File {} update failed, if it is using, please update later, "
×
379
                                "or you can delete then update again".format(path))
380
                sval.value = False
×
381
                yield 1
×
382
            else:
383
                is_futures = "futures" == os.path.basename(path).split(".")[0]
×
384
                for order_book_id in self._order_book_ids:
×
385
                    # 特殊处理前复权合约,需要全量更新
386
                    is_pre = is_futures and "888" in order_book_id
×
387
                    if order_book_id in h5 and not is_pre:
×
388
                        try:
×
389
                            last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
×
390
                        except OSError:
×
391
                            system_log.error("File {} update failed, if it is using, please update later, "
×
392
                                            "or you can delete then update again".format(path))
393
                            sval.value = False
×
394
                            yield 1
×
395
                            break
×
396
                        except ValueError:
×
397
                            h5.pop(order_book_id)
×
398
                            start_date = START_DATE
×
399
                        else:
400
                            start_date = rqdatac.get_next_trading_date(last_date)
×
401
                    else:
402
                        start_date = START_DATE
×
403
                    df = rqdatac.get_price(order_book_id, start_date, END_DATE, '1d',
×
404
                                        adjust_type='none', fields=fields, expect_df=True)
405
                    if not (df is None or df.empty):
×
406
                        df = df[fields]  # Future order_book_id like SC888 will auto add 'dominant_id'
×
407
                        df = df.loc[order_book_id]
×
408
                        df.reset_index(inplace=True)
×
409
                        df['datetime'] = [convert_date_to_int(d) for d in df['date']]
×
410
                        del df['date']
×
411
                        df.set_index('datetime', inplace=True)
×
412
                        if order_book_id in h5:
×
413
                            data = np.array(
×
414
                                [tuple(i) for i in chain(h5[order_book_id][:], df.to_records())],
415
                                dtype=h5[order_book_id].dtype
416
                            )
417
                            del h5[order_book_id]
×
418
                            h5.create_dataset(order_book_id, data=data, **kwargs)
×
419
                        else:
420
                            h5.create_dataset(order_book_id, data=df.to_records(), **kwargs)
×
421
                    yield 1
×
422
            finally:
423
                if h5:
×
424
                    h5.close()
×
425

426

427
def process_init(args: Optional[Synchronized] = None, kwargs = None):
7✔
428
    kwargs = kwargs or {}
×
429
    import warnings
×
430
    with warnings.catch_warnings(record=True):
×
431
        # catch warning: rqdatac is already inited. Settings will be changed
432
        rqdatac.init(**kwargs)
×
433
    init_logger()
×
434
    # Initialize process shared variables
435
    if args:
×
436
        global sval
437
        sval = args
×
438

439

440
def update_bundle(path, create, enable_compression=False, concurrency=1, **kwargs):
7✔
441
    if create:
×
442
        _DayBarTask = GenerateDayBarTask
×
443
    else:
444
        _DayBarTask = UpdateDayBarTask
×
445

446
    init_logger()
×
447
    day_bar_args = (
×
448
        ("stocks.h5", rqdatac.all_instruments('CS').order_book_id.tolist(), STOCK_FIELDS),
449
        ("indexes.h5", rqdatac.all_instruments('INDX').order_book_id.tolist(), INDEX_FIELDS),
450
        ("futures.h5", rqdatac.all_instruments('Future').order_book_id.tolist(), FUTURES_FIELDS),
451
        ("funds.h5", rqdatac.all_instruments('FUND').order_book_id.tolist(), FUND_FIELDS),
452
    )
453

454
    rqdatac.reset()
×
455

456
    gen_file_funcs = (
×
457
        gen_instruments, gen_trading_dates, gen_dividends, gen_splits, gen_ex_factor, gen_st_days,
458
        gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info
459
    )
460

461
    succeed = multiprocessing.Value(c_bool, True)
×
462
    with ProgressedProcessPoolExecutor(
×
463
            max_workers=concurrency, initializer=process_init, initargs=(succeed, kwargs)
464
    ) as executor:
465
        kwargs = {}
×
466
        if enable_compression:
×
467
            kwargs['compression'] = 9
×
468
        # windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
469
        for func in gen_file_funcs:
×
470
            executor.submit(GenerateFileTask(func), path)
×
471
        for file, order_book_id, field in day_bar_args:
×
472
            executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs)
×
473
    return succeed.value
×
474

475

476
class AutomaticUpdateBundle(object):
7✔
477
    def __init__(self, path: str, filename: str, api: Callable, fields: List[str], end_date: datetime.date, start_date: Union[int, datetime.date] = START_DATE) -> None:
7✔
478
        if not os.path.exists(path):
7✔
479
            os.makedirs(path)
7✔
480
        self._file = os.path.join(path, filename)
7✔
481
        self._trading_dates = None
7✔
482
        self._filename = filename
7✔
483
        self._api = api
7✔
484
        self._fields = fields
7✔
485
        self._start_date = start_date
7✔
486
        self.updated = []
7✔
487
        self._env = Environment.get_instance()
7✔
488
        self._file_lock = FileLock(self._file + ".lock")
7✔
489

490
    def get_data(self, instrument: Instrument, dt: datetime.date) -> Optional[np.ndarray]:
7✔
491
        dt = convert_date_to_date_int(dt)
7✔
492
        data = self._get_data_all_time(instrument)
7✔
493
        if data is None:
7✔
494
            return data
×
495
        else:
496
            try:
7✔
497
                data = data[np.searchsorted(data['trading_dt'], dt)]
7✔
498
            except IndexError:
×
499
                data = None
×
500
            return data
7✔
501

502
    @lru_cache(128)
7✔
503
    def _get_data_all_time(self, instrument: Instrument) -> Optional[np.ndarray]:
7✔
504
        if instrument.order_book_id not in self.updated:
7✔
505
            self._auto_update_task(instrument)
7✔
506
            self.updated.append(instrument.order_book_id)
7✔
507
        with h5py.File(self._file, "r") as h5:
7✔
508
            data = h5[instrument.order_book_id][:]
7✔
509
            if len(data) == 0:
7✔
510
                return None
×
511
        return data
7✔
512
    
513
    def _auto_update_task(self, instrument: Instrument) -> None:
7✔
514
        """
515
        在 rqalpha 策略运行过程中自动更新所需的日线数据
516

517
        :param instrument: 合约对象
518
        :type instrument: `Instrument`
519
        """
520
        order_book_id = instrument.order_book_id
7✔
521
        start_date = self._start_date
7✔
522
        try:
7✔
523
            with self._file_lock.acquire():
7✔
524
                h5 = h5py.File(self._file, "a")
7✔
525
                if order_book_id in h5 and h5[order_book_id].dtype.names:
7✔
526
                    if 'trading_dt' in h5[order_book_id].dtype.names:
×
527
                        # 需要兼容此前的旧版数据,对字段名进行更新
528
                        if len(h5[order_book_id][:]) != 0:
×
529
                            last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date()
×
NEW
530
                            if last_date >= self._env.config.base.end_date:
×
531
                                return
×
532
                            start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date()
×
NEW
533
                            if start_date > self._env.config.base.end_date:
×
534
                                return
×
535
                    else:
536
                        del h5[order_book_id]
×
537
                
538
                arr = self._get_array(instrument, start_date)
7✔
539
                if arr is None:
7✔
540
                    if order_book_id not in h5:
×
541
                        arr = np.array([])
×
542
                        h5.create_dataset(order_book_id, data=arr)
×
543
                else:
544
                    if order_book_id in h5:
7✔
545
                        data = np.array(
×
546
                            [tuple(i) for i in chain(h5[order_book_id][:], arr)],
547
                            dtype=h5[order_book_id].dtype)
548
                        del h5[order_book_id]
×
549
                        h5.create_dataset(order_book_id, data=data)
×
550
                    else:
551
                        h5.create_dataset(order_book_id, data=arr)
7✔
552
        except (OSError, Timeout) as e:
×
553
            raise OSError(_("File {} update failed, if it is using, please update later, "
×
554
                          "or you can delete then update again".format(self._file))) from e
555
        finally:
556
            h5.close()
7✔
557
    
558
    def _get_array(self, instrument: Instrument, start_date: datetime.date) -> Optional[np.ndarray]:
7✔
559
        df = self._api(instrument.order_book_id, start_date, self._env.config.base.end_date, self._fields)
7✔
560
        if not (df is None or df.empty):
7✔
561
            df = df[self._fields].loc[instrument.order_book_id] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
7✔
562
            record = df.iloc[0: 1].to_records()
7✔
563
            dtype = [('trading_dt', 'int')]
7✔
564
            for field in self._fields:
7✔
565
                dtype.append((field, record.dtype[field]))
7✔
566
            trading_dt = self._env.data_proxy._data_source.batch_get_trading_date(df.index)
7✔
567
            trading_dt = convert_date_to_date_int(trading_dt)
7✔
568
            arr = np.ones((trading_dt.shape[0], ), dtype=dtype)
7✔
569
            arr['trading_dt'] = trading_dt
7✔
570
            for field in self._fields:
7✔
571
                arr[field] = df[field].values
7✔
572
            return arr
7✔
573
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc