• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ricequant / rqalpha / 21350729672

26 Jan 2026 08:17AM UTC coverage: 67.675%. First build
21350729672

Pull #961

github

web-flow
exclude tests (#965)

* exclude tests

* update

---------

Co-authored-by: Don <lin.dongzhao@ricequant.com>
Pull Request #961: Develop

501 of 585 new or added lines in 26 files covered. (85.64%)

7428 of 10976 relevant lines covered (67.67%)

5.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.64
/rqalpha/data/bundle.py
1
# -*- coding: utf-8 -*-
2
# 版权所有 2019 深圳米筐科技有限公司(下称“米筐科技”)
3
#
4
# 除非遵守当前许可,否则不得使用本软件。
5
#
6
#     * 非商业用途(非商业用途指个人出于非商业目的使用本软件,或者高校、研究所等非营利机构出于教育、科研等目的使用本软件):
7
#         遵守 Apache License 2.0(下称“Apache 2.0 许可”),您可以在以下位置获得 Apache 2.0 许可的副本:
8
#         http://www.apache.org/licenses/LICENSE-2.0。
9
#         除非法律有要求或以书面形式达成协议,否则本软件分发时需保持当前许可“原样”不变,且不得附加任何条件。
10
#
11
#     * 商业用途(商业用途指个人出于任何商业目的使用本软件,或者法人或其他组织出于任何目的使用本软件):
12
#         未经米筐科技授权,任何个人不得出于任何商业目的使用本软件(包括但不限于向第三方提供、销售、出租、出借、转让本软件、本软件的衍生产品、引用或借鉴了本软件功能或源代码的产品或服务),任何法人或其他组织不得出于任何目的使用本软件,否则米筐科技有权追究相应的知识产权侵权责任。
13
#         在此前提下,对本软件的使用同样需要遵守 Apache 2.0 许可,Apache 2.0 许可与本许可冲突之处,以本许可为准。
14
#         详细的授权流程,请联系 public@ricequant.com 获取。
15
import datetime
8✔
16
import json
8✔
17
import os
8✔
18
import pickle
8✔
19
import re
8✔
20
from itertools import chain
8✔
21
from typing import Callable, Optional, Union, List, Iterable, Tuple
8✔
22
from filelock import FileLock, Timeout
8✔
23
import multiprocessing
8✔
24
from multiprocessing.sharedctypes import Synchronized
8✔
25
from ctypes import c_bool
8✔
26

27
import h5py
8✔
28
import numpy as np
8✔
29
from rqalpha.apis.api_rqdatac import rqdatac
8✔
30
from rqalpha.utils.concurrent import ProgressedProcessPoolExecutor, ProgressedTask
8✔
31
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
8✔
32
from rqalpha.utils.i18n import gettext as _
8✔
33
from rqalpha.utils.functools import lru_cache
8✔
34
from rqalpha.utils.logger import init_logger, system_log
8✔
35
from rqalpha.environment import Environment
8✔
36
from rqalpha.model.instrument import Instrument
8✔
37

38

39
START_DATE = 20050104
8✔
40
END_DATE = 29991231
8✔
41

42

43
def gen_instruments(d):
8✔
44
    stocks = sorted(list(rqdatac.all_instruments().order_book_id))
×
45
    instruments = [i.__dict__ for i in rqdatac.instruments(stocks)]
×
46
    with open(os.path.join(d, 'instruments.pk'), 'wb') as out:
×
47
        pickle.dump(instruments, out, protocol=2)
×
48

49

50
def gen_yield_curve(d):
8✔
51
    yield_curve = rqdatac.get_yield_curve(start_date=START_DATE, end_date=datetime.date.today())
×
52
    yield_curve.index = [convert_date_to_date_int(d) for d in yield_curve.index]
×
53
    yield_curve.index.name = 'date'
×
54
    with h5py.File(os.path.join(d, 'yield_curve.h5'), 'w') as f:
×
55
        f.create_dataset('data', data=yield_curve.to_records())
×
56

57

58
def gen_trading_dates(d):
8✔
59
    dates = rqdatac.get_trading_dates(start_date=START_DATE, end_date='2999-01-01')
×
60
    dates = np.array([convert_date_to_date_int(d) for d in dates])
×
61
    np.save(os.path.join(d, 'trading_dates.npy'), dates, allow_pickle=False)
×
62

63

64
def gen_st_days(d):
8✔
65
    from rqdatac.client import get_client
×
66
    stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
×
67
    st_days = get_client().execute('get_st_days', stocks, START_DATE,
×
68
                                   convert_date_to_date_int(datetime.date.today()))
69
    with h5py.File(os.path.join(d, 'st_stock_days.h5'), 'w') as h5:
×
70
        for order_book_id, days in st_days.items():
×
71
            h5[order_book_id] = days
×
72

73

74
def gen_suspended_days(d):
8✔
75
    from rqdatac.client import get_client
×
76
    stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
×
77
    suspended_days = get_client().execute('get_suspended_days', stocks, START_DATE,
×
78
                                          convert_date_to_date_int(datetime.date.today()))
79
    with h5py.File(os.path.join(d, 'suspended_days.h5'), 'w') as h5:
×
80
        for order_book_id, days in suspended_days.items():
×
81
            h5[order_book_id] = days
×
82

83

84
class GenerateDividendBundle:
8✔
85
    def __init__(self, d: str):
8✔
86
        self.d = d
×
87

88
    def _get_dividend(self):
8✔
89
        stocks = rqdatac.all_instruments().order_book_id.tolist()
×
90
        return rqdatac.get_dividend(stocks)
×
91

92
    def _write(self, data_iter: Iterable[Tuple[str, np.ndarray]]):
8✔
93
        with h5py.File(os.path.join(self.d, 'dividends.h5'), "w") as h5:
×
94
            for order_book_id, data in data_iter:
×
95
                h5.create_dataset(order_book_id, data=data)
×
96

97
    def __call__(self):
8✔
98
        dividend = self._get_dividend()
×
99
        if dividend is None:
×
100
            raise RuntimeError("Got no dividend data")
×
101
        need_cols = ["dividend_cash_before_tax", "book_closure_date", "ex_dividend_date", "payable_date", "round_lot"]
×
102
        dividend = dividend[need_cols]
×
103
        dividend.reset_index(inplace=True)
×
104
        dividend.rename(columns={'declaration_announcement_date': 'announcement_date'}, inplace=True)
×
105
        for f in ('book_closure_date', 'ex_dividend_date', 'payable_date', 'announcement_date'):
×
106
            dividend[f] = [convert_date_to_date_int(d) for d in dividend[f]]
×
107
        dividend.set_index(['order_book_id', 'book_closure_date'], inplace=True)
×
108
        self._write([(
×
109
            order_book_id, dividend.loc[order_book_id].to_records()
110
        ) for order_book_id in dividend.index.levels[0]])  # type: ignore
111

112

113
class GenerateSplitBundle:
8✔
114
    def __init__(self, d: str):
8✔
115
        self.d = d
×
116

117
    def _get_split(self):
8✔
118
        stocks = rqdatac.all_instruments().order_book_id.tolist()
×
119
        return rqdatac.get_split(stocks)
×
120
    
121
    def _write(self, data_iter: Iterable[Tuple[str, np.ndarray]]):
8✔
122
        with h5py.File(os.path.join(self.d, 'split_factor.h5'), "w") as h5:
×
123
            for order_book_id, data in data_iter:
×
124
                h5.create_dataset(order_book_id, data=data)
×
125
    
126
    def __call__(self):
8✔
127
        split = self._get_split()
×
128
        if split is None:
×
129
            raise RuntimeError("Got no split data")
×
130
        split['split_factor'] = split['split_coefficient_to'] / split['split_coefficient_from']
×
NEW
131
        split = split[['split_factor', 'split_coefficient_to', 'split_coefficient_from']]
×
132
        split.reset_index(inplace=True)
×
133
        split.rename(columns={'ex_dividend_date': 'ex_date'}, inplace=True)  # type: ignore
×
134
        split['ex_date'] = [convert_date_to_int(d) for d in split['ex_date']]
×
135
        split.set_index(['order_book_id', 'ex_date'], inplace=True)
×
136
        self._write([(
×
137
            order_book_id, split.loc[order_book_id].to_records()
138
        ) for order_book_id in split.index.levels[0]])  # type: ignore
139

140
    
141
class GenerateExFactorBundle:
8✔
142
    def __init__(self, d: str):
8✔
143
        self.d = d
×
144
    
145
    def _get_ex_factor(self):
8✔
146
        stocks = rqdatac.all_instruments().order_book_id.tolist()
×
147
        return rqdatac.get_ex_factor(stocks)
×
148

149
    def _write(self, data_iter: Iterable[Tuple[str, np.ndarray]]):
8✔
150
        with h5py.File(os.path.join(self.d, 'ex_cum_factor.h5'), "w") as h5:
×
151
            for order_book_id, data in data_iter:
×
152
                h5.create_dataset(order_book_id, data=data)
×
153
    
154
    def __call__(self):
8✔
155
        ex_factor = self._get_ex_factor()
×
156
        if ex_factor is None:
×
157
            raise RuntimeError("Got no ex factor data")
×
158
        ex_factor.reset_index(inplace=True)
×
159
        ex_factor['ex_date'] = [convert_date_to_int(d) for d in ex_factor['ex_date']]
×
160
        ex_factor.rename(columns={'ex_date': 'start_date'}, inplace=True)
×
161
        ex_factor.set_index(['order_book_id', 'start_date'], inplace=True)
×
162
        ex_factor = ex_factor[['ex_cum_factor']]
×
163

164
        dtype = ex_factor.loc[ex_factor.index.levels[0][0]].to_records().dtype  # type: ignore
×
165
        initial = np.empty((1,), dtype=dtype)
×
166
        initial['start_date'] = 0
×
167
        initial['ex_cum_factor'] = 1.0
×
168

169
        self._write(((
×
170
            order_book_id, np.concatenate([initial, ex_factor.loc[order_book_id].to_records()])
171
        ) for order_book_id in ex_factor.index.levels[0]))  # type: ignore
172

173

174
def gen_share_transformation(d):
8✔
175
    df = rqdatac.get_share_transformation()
×
176
    if df is None:
×
177
        raise RuntimeError("Got no share transformation data")
×
178
    df.drop_duplicates("predecessor", inplace=True)
×
179
    df.set_index('predecessor', inplace=True)
×
180
    df.effective_date = df.effective_date.astype(str)
×
181
    df.predecessor_delisted_date = df.predecessor_delisted_date.astype(str)
×
182

183
    json_file = os.path.join(d, 'share_transformation.json')
×
184
    with open(json_file, 'w') as f:
×
185
        f.write(df.to_json(orient='index'))
×
186

187

188
def gen_future_info(d):
8✔
189
    future_info_file = os.path.join(d, 'future_info.json')
×
190

191
    def _need_to_recreate():
×
192
        if not os.path.exists(future_info_file):
×
193
            return
×
194
        else:
195
            with open(future_info_file, "r") as f:
×
196
                all_futures_info = json.load(f)
×
197
                if "margin_rate" not in all_futures_info[0]:
×
198
                    return True
×
199
    
200
    def update_margin_rate(file):
×
201
        all_instruments_data = rqdatac.all_instruments("Future")
×
202
        with open(file, "r") as f:
×
203
            all_futures_info = json.load(f)
×
204
            new_all_futures_info = []
×
205
            for future_info in all_futures_info:
×
206
                if "order_book_id" in future_info:
×
207
                    future_info["margin_rate"] = all_instruments_data[all_instruments_data["order_book_id"] == future_info["order_book_id"]].iloc[0].margin_rate
×
208
                elif "underlying_symbol" in future_info:
×
209
                    dominant = rqdatac.futures.get_dominant(future_info["underlying_symbol"])[-1]
×
210
                    future_info["margin_rate"] = all_instruments_data[all_instruments_data["order_book_id"] == dominant].iloc[0].margin_rate
×
211
                new_all_futures_info.append(future_info)
×
212
        os.remove(file)
×
213
        with open(file, "w") as f:
×
214
            json.dump(new_all_futures_info, f, separators=(',', ':'), indent=2)
×
215

216
    if (_need_to_recreate()): update_margin_rate(future_info_file)
×
217

218
    # 新增 hard_code 的种类时,需要同时修改rqalpha.data.base_data_source.storages.FutureInfoStore.data_compatible中的内容
219
    hard_code = [
×
220
        {'underlying_symbol': 'TC',
221
          'close_commission_ratio': 4.0,
222
          'close_commission_today_ratio': 0.0,
223
          'commission_type': "by_volume",
224
          'open_commission_ratio': 4.0,
225
          'margin_rate': 0.05,
226
          'tick_size': 0.2},
227
         {'underlying_symbol': 'ER',
228
          'close_commission_ratio': 2.5,
229
          'close_commission_today_ratio': 2.5,
230
          'commission_type': "by_volume",
231
          'open_commission_ratio': 2.5,
232
          'margin_rate': 0.05,
233
          'tick_size': 1.0},
234
         {'underlying_symbol': 'WS',
235
          'close_commission_ratio': 2.5,
236
          'close_commission_today_ratio': 0.0,
237
          'commission_type': "by_volume",
238
          'open_commission_ratio': 2.5,
239
          'margin_rate': 0.05,
240
          'tick_size': 1.0},
241
         {'underlying_symbol': 'RO',
242
          'close_commission_ratio': 2.5,
243
          'close_commission_today_ratio': 0.0,
244
          'commission_type': "by_volume",
245
          'open_commission_ratio': 2.5,
246
          'margin_rate': 0.05,
247
          'tick_size': 2.0},
248
         {'underlying_symbol': 'ME',
249
          'close_commission_ratio': 1.4,
250
          'close_commission_today_ratio': 0.0,
251
          'commission_type': "by_volume",
252
          'open_commission_ratio': 1.4,
253
          'margin_rate': 0.06,
254
          'tick_size': 1.0},
255
        {'underlying_symbol': 'WT',
256
         'close_commission_ratio': 5.0,
257
         'close_commission_today_ratio': 5.0,
258
         'commission_type': "by_volume",
259
         'open_commission_ratio': 5.0,
260
         'margin_rate': 0.05,
261
         'tick_size': 1.0},
262
    ]
263

264
    if not os.path.exists(future_info_file):
×
265
        all_futures_info = hard_code
×
266
    else:
267
        with open(future_info_file, 'r') as f:
×
268
            all_futures_info = json.load(f)
×
269

270
    future_list = []
×
271
    symbol_list = []
×
272
    param = ['close_commission_ratio', 'close_commission_today_ratio', 'commission_type', 'open_commission_ratio']
×
273

274
    for i in all_futures_info:
×
275
        if i.get('order_book_id'):
×
276
            future_list.append(i.get('order_book_id'))
×
277
        else:
278
            symbol_list.append(i.get('underlying_symbol'))
×
279

280
    # 当修改了hard_code以后,避免用户需要手动删除future_info.json文件
281
    for info in hard_code:
×
282
        if info["underlying_symbol"] not in symbol_list:
×
283
            all_futures_info.append(info)
×
284
            symbol_list.append(info["underlying_symbol"])
×
285

286
    futures_order_book_id = rqdatac.all_instruments(type='Future')['order_book_id'].unique()
×
287
    commission_df = rqdatac.futures.get_commission_margin()
×
288
    for future in futures_order_book_id:
×
289
        underlying_symbol = re.match(r'^[a-zA-Z]*', future).group()
×
290
        if future in future_list:
×
291
            continue
×
292
        future_dict = {}
×
293
        commission = commission_df[commission_df['order_book_id'] == future]
×
294
        if not commission.empty:
×
295
            future_dict['order_book_id'] = future
×
296
            commission = commission.iloc[0]
×
297
            for p in param:
×
298
                future_dict[p] = commission[p]
×
299
            instruemnts_data = rqdatac.instruments(future)
×
300
            future_dict['margin_rate'] = instruemnts_data.margin_rate
×
301
            future_dict['tick_size'] = instruemnts_data.tick_size()
×
302
        elif underlying_symbol in symbol_list:
×
303
            continue
×
304
        else:
305
            symbol_list.append(underlying_symbol)
×
306
            future_dict['underlying_symbol'] = underlying_symbol
×
307
            try:
×
308
                dominant = rqdatac.futures.get_dominant(underlying_symbol).iloc[-1]
×
309
            except AttributeError:
×
310
                # FIXME: why get_dominant return None???
311
                continue
×
312
            
313
            dominant_indexer = commission_df["order_book_id"] == dominant
×
314
            if not dominant_indexer.any():
×
315
                # S0301:大豆期货的最后一个合约,该合约出现在 instrument 中,但取不到 commission,这种情况忽略掉
316
                continue
×
317
            commission = commission_df[dominant_indexer].iloc[0]
×
318

319
            for p in param:
×
320
                future_dict[p] = commission[p]
×
321
            instruemnts_data = rqdatac.instruments(dominant)
×
322
            future_dict['margin_rate'] = instruemnts_data.margin_rate
×
323
            future_dict['tick_size'] = instruemnts_data.tick_size()
×
324
        all_futures_info.append(future_dict)
×
325

326
    with open(os.path.join(d, 'future_info.json'), 'w') as f:
×
327
        json.dump(all_futures_info, f, separators=(',', ':'), indent=2)
×
328

329

330
class GenerateFileTask(ProgressedTask):
8✔
331
    def __init__(self, func, *args, **kwargs):
8✔
332
        self._func = func
×
333
        self._args = args
×
334
        self._kwargs = kwargs
×
335
        self._step = 100
×
336

337
    @property
8✔
338
    def total_steps(self):
7✔
339
        # type: () -> int
340
        return self._step
×
341

342
    def __call__(self):
8✔
343
        self._func(*self._args, **self._kwargs)
×
344
        yield self._step
×
345

346

347
STOCK_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'limit_up', 'limit_down', 'volume', 'total_turnover']
8✔
348
INDEX_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'volume', 'total_turnover']
8✔
349
FUTURES_FIELDS = STOCK_FIELDS + ['settlement', 'prev_settlement', 'open_interest']
8✔
350
FUND_FIELDS = STOCK_FIELDS
8✔
351

352

353
class DayBarTask(ProgressedTask):
8✔
354
    def __init__(self, order_book_ids, file_path: str, fields: List[str], market="cn", **h5_kwargs):
8✔
355
        self._order_book_ids = order_book_ids
×
356
        self._file_path = file_path
×
357
        self._fields = fields
×
358
        self._h5_kwargs = h5_kwargs
×
359
        self._market = market
×
360

361
    @property
8✔
362
    def total_steps(self):
7✔
363
        # type: () -> int
364
        return len(self._order_book_ids)
×
365

366
    def __call__(self):
8✔
367
        raise NotImplementedError
×
368

369

370
class GenerateDayBarTask(DayBarTask):
8✔
371
    def __call__(self):
8✔
372
        try:
×
373
            h5 = h5py.File(self._file_path, "w")
×
374
        except OSError:
×
375
            system_log.error("File {} update failed, if it is using, please update later, "
×
376
                            "or you can delete then update again".format(self._file_path))
377
            sval.value = False
×
378
            yield 1
×
379
        else:
380
            with h5:
×
381
                i, step = 0, 300
×
382
                while True:
383
                    order_book_ids = self._order_book_ids[i:i + step]
×
384
                    df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
×
385
                                        adjust_type='none', fields=self._fields, expect_df=True, market=self._market)
386
                    if not (df is None or df.empty):
×
387
                        df.reset_index(inplace=True)
×
388
                        df['datetime'] = [convert_date_to_int(d) for d in df['date']]
×
389
                        del df['date']
×
390
                        df.set_index(['order_book_id', 'datetime'], inplace=True)
×
391
                        df.sort_index(inplace=True)
×
392
                        for order_book_id in df.index.levels[0]:
×
393
                            h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **self._h5_kwargs)
×
394
                    i += step
×
395
                    yield len(order_book_ids)
×
396
                    if i >= len(self._order_book_ids):
×
397
                        break
×
398

399

400
class UpdateDayBarTask(DayBarTask):
8✔
401
    def h5_has_valid_fields(self, h5, wanted_fields):
8✔
402
        obid_gen = (k for k in h5.keys())
×
403
        wanted_fields = set(wanted_fields)
×
404
        wanted_fields.add('datetime')
×
405
        try:
×
406
            h5_fields = set(h5[next(obid_gen)].dtype.fields.keys())
×
407
        except StopIteration:
×
408
            pass
×
409
        else:
410
            return h5_fields == wanted_fields
×
411
        return False
×
412

413
    def __call__(self):
8✔
414
        need_recreate_h5 = False
×
415
        try:
×
416
            with h5py.File(self._file_path, 'r') as h5:
×
417
                need_recreate_h5 = not self.h5_has_valid_fields(h5, self._fields)
×
418
        except (OSError, RuntimeError):
×
419
            need_recreate_h5 = True
×
420
        if need_recreate_h5:
×
421
            yield from GenerateDayBarTask(self._order_book_ids, self._file_path, self._fields, self._market, **self._h5_kwargs)()
×
422
        else:
423
            h5 = None
×
424
            try:
×
425
                h5 = h5py.File(self._file_path, 'a')
×
426
            except OSError:
×
427
                system_log.error("File {} update failed, if it is using, please update later, "
×
428
                                "or you can delete then update again".format(self._file_path))
429
                sval.value = False
×
430
                yield 1
×
431
            else:
432
                is_futures = "futures" == os.path.basename(self._file_path).split(".")[0]
×
433
                for order_book_id in self._order_book_ids:
×
434
                    # 特殊处理前复权合约,需要全量更新
435
                    is_pre = is_futures and "888" in order_book_id
×
436
                    if order_book_id in h5 and not is_pre:
×
437
                        try:
×
438
                            last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
×
439
                        except OSError:
×
440
                            system_log.error("File {} update failed, if it is using, please update later, "
×
441
                                            "or you can delete then update again".format(self._file_path))
442
                            sval.value = False
×
443
                            yield 1
×
444
                            break
×
445
                        except ValueError:
×
446
                            h5.pop(order_book_id)
×
447
                            start_date = START_DATE
×
448
                        else:
449
                            start_date = rqdatac.get_next_trading_date(last_date)
×
450
                    else:
451
                        start_date = START_DATE
×
452
                    df = rqdatac.get_price(order_book_id, start_date, END_DATE, '1d',
×
453
                                        adjust_type='none', fields=self._fields, expect_df=True, market=self._market)
454
                    if not (df is None or df.empty):
×
455
                        df = df[self._fields]  # Future order_book_id like SC888 will auto add 'dominant_id'
×
456
                        df = df.loc[order_book_id]
×
457
                        df.reset_index(inplace=True)
×
458
                        df['datetime'] = [convert_date_to_int(d) for d in df['date']]
×
459
                        del df['date']
×
460
                        df.set_index('datetime', inplace=True)
×
461
                        if order_book_id in h5:
×
462
                            data = np.array(
×
463
                                [tuple(i) for i in chain(h5[order_book_id][:], df.to_records())],
464
                                dtype=h5[order_book_id].dtype
465
                            )
466
                            del h5[order_book_id]
×
467
                            h5.create_dataset(order_book_id, data=data, **self._h5_kwargs)
×
468
                        else:
469
                            h5.create_dataset(order_book_id, data=df.to_records(), **self._h5_kwargs)
×
470
                    yield 1
×
471
            finally:
472
                if h5:
×
473
                    h5.close()
×
474

475

476
def process_init(args: Optional[Synchronized] = None, kwargs = None):
8✔
477
    kwargs = kwargs or {}
×
478
    import warnings
×
479
    with warnings.catch_warnings(record=True):
×
480
        # catch warning: rqdatac is already inited. Settings will be changed
481
        rqdatac.init(**kwargs)
×
482
    init_logger()
×
483
    # Initialize process shared variables
484
    if args:
×
485
        global sval
486
        sval = args
×
487

488

489
def gather_tasks(path: str, create: bool, enable_compression: bool, **h5_kwargs) -> List[ProgressedTask]:
8✔
490
    tasks = []
×
491
    if create:
×
492
        _DayBarTask = GenerateDayBarTask
×
493
    else:
494
        _DayBarTask = UpdateDayBarTask
×
495

496
    init_logger()
×
497
    day_bar_args = (
×
498
        ("stocks.h5", rqdatac.all_instruments('CS').order_book_id.tolist(), STOCK_FIELDS),
499
        ("indexes.h5", rqdatac.all_instruments('INDX').order_book_id.tolist(), INDEX_FIELDS),
500
        ("futures.h5", rqdatac.all_instruments('Future').order_book_id.tolist(), FUTURES_FIELDS),
501
        ("funds.h5", rqdatac.all_instruments('FUND').order_book_id.tolist(), FUND_FIELDS),
502
    )
503

504
    gen_file_funcs = (
×
505
        gen_instruments, gen_trading_dates, gen_st_days,
506
        gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info
507
    )
508
    kwargs = {}
×
509
    if enable_compression:
×
510
        kwargs['compression'] = 9
×
511
    # windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
512
    for file, order_book_id, field in day_bar_args:
×
513
        tasks.append(_DayBarTask(order_book_id, os.path.join(path, file), field, **h5_kwargs))
×
514
    for func in gen_file_funcs:
×
515
        tasks.append(GenerateFileTask(func, path))
×
516
    tasks.append(GenerateFileTask(GenerateDividendBundle(path)))
×
517
    tasks.append(GenerateFileTask(GenerateSplitBundle(path)))
×
518
    tasks.append(GenerateFileTask(GenerateExFactorBundle(path)))
×
519
    return tasks
×
520

521

522
def run_tasks(tasks: List[ProgressedTask], concurrency: int = 1, **rqdatac_kwargs):
8✔
523
    succeed = multiprocessing.Value(c_bool, True)
×
524
    with ProgressedProcessPoolExecutor(
×
525
            max_workers=concurrency, initializer=process_init, initargs=(succeed, rqdatac_kwargs)
526
    ) as executor:
527
        for task in tasks:
×
528
            executor.submit(task)
×
529
    return succeed.value
×
530

531

532
def update_bundle(path, create, enable_compression=False, concurrency=1, rqdata_kwargs=None, **h5_kwargs):
8✔
NEW
533
    tasks = gather_tasks(path, create, enable_compression, **h5_kwargs)
×
NEW
534
    rqdata_kwargs = rqdata_kwargs or {}
×
NEW
535
    return run_tasks(tasks, concurrency, **rqdata_kwargs)
×
536

537

538
class AutomaticUpdateBundle(object):
8✔
539
    def __init__(self, path: str, filename: str, api: Callable, fields: List[str], end_date: datetime.date, start_date: Union[int, datetime.date] = START_DATE) -> None:
8✔
540
        if not os.path.exists(path):
8✔
541
            os.makedirs(path)
8✔
542
        self._file = os.path.join(path, filename)
8✔
543
        self._trading_dates = None
8✔
544
        self._filename = filename
8✔
545
        self._api = api
8✔
546
        self._fields = fields
8✔
547
        self._start_date = start_date
8✔
548
        self.updated = []
8✔
549
        self._env = Environment.get_instance()
8✔
550
        self._file_lock = FileLock(self._file + ".lock")
8✔
551

552
    def get_data(self, instrument: Instrument, dt: datetime.date) -> Optional[np.ndarray]:
8✔
553
        dt = convert_date_to_date_int(dt)
8✔
554
        data = self._get_data_all_time(instrument)
8✔
555
        if data is None:
8✔
556
            return data
×
557
        else:
558
            try:
8✔
559
                data = data[np.searchsorted(data['trading_dt'], dt)]
8✔
560
            except IndexError:
×
561
                data = None
×
562
            return data
8✔
563

564
    @lru_cache(128)
8✔
565
    def _get_data_all_time(self, instrument: Instrument) -> Optional[np.ndarray]:
8✔
566
        if instrument.order_book_id not in self.updated:
8✔
567
            self._auto_update_task(instrument)
8✔
568
            self.updated.append(instrument.order_book_id)
8✔
569
        with h5py.File(self._file, "r") as h5:
8✔
570
            data = h5[instrument.order_book_id][:]
8✔
571
            if len(data) == 0:
8✔
572
                return None
×
573
        return data
8✔
574
    
575
    def _auto_update_task(self, instrument: Instrument) -> None:
8✔
576
        """
577
        在 rqalpha 策略运行过程中自动更新所需的日线数据
578

579
        :param instrument: 合约对象
580
        :type instrument: `Instrument`
581
        """
582
        order_book_id = instrument.order_book_id
8✔
583
        start_date = self._start_date
8✔
584
        try:
8✔
585
            with self._file_lock.acquire():
8✔
586
                h5 = h5py.File(self._file, "a")
8✔
587
                if order_book_id in h5 and h5[order_book_id].dtype.names:
8✔
588
                    if 'trading_dt' in h5[order_book_id].dtype.names:
×
589
                        # 需要兼容此前的旧版数据,对字段名进行更新
590
                        if len(h5[order_book_id][:]) != 0:
×
591
                            last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date()
×
592
                            if last_date >= self._env.config.base.end_date:
×
593
                                return
×
594
                            start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date()
×
595
                            if start_date > self._env.config.base.end_date:
×
596
                                return
×
597
                    else:
598
                        del h5[order_book_id]
×
599
                
600
                arr = self._get_array(instrument, start_date)
8✔
601
                if arr is None:
8✔
602
                    if order_book_id not in h5:
×
603
                        arr = np.array([])
×
604
                        h5.create_dataset(order_book_id, data=arr)
×
605
                else:
606
                    if order_book_id in h5:
8✔
607
                        data = np.array(
×
608
                            [tuple(i) for i in chain(h5[order_book_id][:], arr)],
609
                            dtype=h5[order_book_id].dtype)
610
                        del h5[order_book_id]
×
611
                        h5.create_dataset(order_book_id, data=data)
×
612
                    else:
613
                        h5.create_dataset(order_book_id, data=arr)
8✔
614
        except (OSError, Timeout) as e:
×
615
            raise OSError(_("File {} update failed, if it is using, please update later, "
×
616
                          "or you can delete then update again".format(self._file))) from e
617
        finally:
618
            h5.close()
8✔
619
    
620
    def _get_array(self, instrument: Instrument, start_date: datetime.date) -> Optional[np.ndarray]:
8✔
621
        df = self._api(instrument.order_book_id, start_date, self._env.config.base.end_date, self._fields)
8✔
622
        if not (df is None or df.empty):
8✔
623
            df = df[self._fields].loc[instrument.order_book_id] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
8✔
624
            record = df.iloc[0: 1].to_records()
8✔
625
            dtype = [('trading_dt', 'int')]
8✔
626
            for field in self._fields:
8✔
627
                dtype.append((field, record.dtype[field]))
8✔
628
            trading_dt = self._env.data_proxy._data_source.batch_get_trading_date(df.index)
8✔
629
            trading_dt = convert_date_to_date_int(trading_dt)
8✔
630
            arr = np.ones((trading_dt.shape[0], ), dtype=dtype)
8✔
631
            arr['trading_dt'] = trading_dt
8✔
632
            for field in self._fields:
8✔
633
                arr[field] = df[field].values
8✔
634
            return arr
8✔
635
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc