• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ricequant / rqalpha / 10504041958

22 Aug 2024 07:52AM UTC coverage: 64.949% (+0.04%) from 64.911%
10504041958

Pull #896

github

web-flow
Merge a7da46f69 into e5a93554a
Pull Request #896: 存在更新失败的情况时程序状态码不为0

4 of 16 new or added lines in 2 files covered. (25.0%)

3 existing lines in 1 file now uncovered.

6704 of 10322 relevant lines covered (64.95%)

3.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.18
/rqalpha/data/bundle.py
1
# -*- coding: utf-8 -*-
2
# 版权所有 2019 深圳米筐科技有限公司(下称“米筐科技”)
3
#
4
# 除非遵守当前许可,否则不得使用本软件。
5
#
6
#     * 非商业用途(非商业用途指个人出于非商业目的使用本软件,或者高校、研究所等非营利机构出于教育、科研等目的使用本软件):
7
#         遵守 Apache License 2.0(下称“Apache 2.0 许可”),您可以在以下位置获得 Apache 2.0 许可的副本:
8
#         http://www.apache.org/licenses/LICENSE-2.0。
9
#         除非法律有要求或以书面形式达成协议,否则本软件分发时需保持当前许可“原样”不变,且不得附加任何条件。
10
#
11
#     * 商业用途(商业用途指个人出于任何商业目的使用本软件,或者法人或其他组织出于任何目的使用本软件):
12
#         未经米筐科技授权,任何个人不得出于任何商业目的使用本软件(包括但不限于向第三方提供、销售、出租、出借、转让本软件、本软件的衍生产品、引用或借鉴了本软件功能或源代码的产品或服务),任何法人或其他组织不得出于任何目的使用本软件,否则米筐科技有权追究相应的知识产权侵权责任。
13
#         在此前提下,对本软件的使用同样需要遵守 Apache 2.0 许可,Apache 2.0 许可与本许可冲突之处,以本许可为准。
14
#         详细的授权流程,请联系 public@ricequant.com 获取。
15
import datetime
6✔
16
import json
6✔
17
import os
6✔
18
import pickle
6✔
19
import re
6✔
20
from itertools import chain
6✔
21
from typing import Callable, Optional, Union, List
6✔
22
from filelock import FileLock, Timeout
6✔
23

24
import h5py
6✔
25
import numpy as np
6✔
26
from rqalpha.apis.api_rqdatac import rqdatac
6✔
27
from rqalpha.utils.concurrent import ProgressedProcessPoolExecutor, ProgressedTask
6✔
28
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
6✔
29
from rqalpha.utils.i18n import gettext as _
6✔
30
from rqalpha.utils.functools import lru_cache
6✔
31
from rqalpha.utils.logger import init_logger, system_log
6✔
32
from rqalpha.environment import Environment
6✔
33
from rqalpha.model.instrument import Instrument
6✔
34
import multiprocessing
6✔
35
from multiprocessing.sharedctypes import Synchronized
6✔
36

37
START_DATE = 20050104
6✔
38
END_DATE = 29991231
6✔
39

40

41
def gen_instruments(d):
6✔
42
    stocks = sorted(list(rqdatac.all_instruments().order_book_id))
×
43
    instruments = [i.__dict__ for i in rqdatac.instruments(stocks)]
×
44
    with open(os.path.join(d, 'instruments.pk'), 'wb') as out:
×
45
        pickle.dump(instruments, out, protocol=2)
×
46

47

48
def gen_yield_curve(d):
6✔
49
    yield_curve = rqdatac.get_yield_curve(start_date=START_DATE, end_date=datetime.date.today())
×
50
    yield_curve.index = [convert_date_to_date_int(d) for d in yield_curve.index]
×
51
    yield_curve.index.name = 'date'
×
52
    with h5py.File(os.path.join(d, 'yield_curve.h5'), 'w') as f:
×
53
        f.create_dataset('data', data=yield_curve.to_records())
×
54

55

56
def gen_trading_dates(d):
6✔
57
    dates = rqdatac.get_trading_dates(start_date=START_DATE, end_date='2999-01-01')
×
58
    dates = np.array([convert_date_to_date_int(d) for d in dates])
×
59
    np.save(os.path.join(d, 'trading_dates.npy'), dates, allow_pickle=False)
×
60

61

62
def gen_st_days(d):
6✔
63
    from rqdatac.client import get_client
×
64
    stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
×
65
    st_days = get_client().execute('get_st_days', stocks, START_DATE,
×
66
                                   convert_date_to_date_int(datetime.date.today()))
67
    with h5py.File(os.path.join(d, 'st_stock_days.h5'), 'w') as h5:
×
68
        for order_book_id, days in st_days.items():
×
69
            h5[order_book_id] = days
×
70

71

72
def gen_suspended_days(d):
6✔
73
    from rqdatac.client import get_client
×
74
    stocks = rqdatac.all_instruments('CS').order_book_id.tolist()
×
75
    suspended_days = get_client().execute('get_suspended_days', stocks, START_DATE,
×
76
                                          convert_date_to_date_int(datetime.date.today()))
77
    with h5py.File(os.path.join(d, 'suspended_days.h5'), 'w') as h5:
×
78
        for order_book_id, days in suspended_days.items():
×
79
            h5[order_book_id] = days
×
80

81

82
def gen_dividends(d):
6✔
83
    stocks = rqdatac.all_instruments().order_book_id.tolist()
×
84
    dividend = rqdatac.get_dividend(stocks)
×
85
    need_cols = ["dividend_cash_before_tax", "book_closure_date", "ex_dividend_date", "payable_date", "round_lot"]
×
86
    dividend = dividend[need_cols]
×
87
    dividend.reset_index(inplace=True)
×
88
    dividend.rename(columns={'declaration_announcement_date': 'announcement_date'}, inplace=True)
×
89
    for f in ('book_closure_date', 'ex_dividend_date', 'payable_date', 'announcement_date'):
×
90
        dividend[f] = [convert_date_to_date_int(d) for d in dividend[f]]
×
91
    dividend.set_index(['order_book_id', 'book_closure_date'], inplace=True)
×
92
    with h5py.File(os.path.join(d, 'dividends.h5'), 'w') as h5:
×
93
        for order_book_id in dividend.index.levels[0]:
×
94
            h5[order_book_id] = dividend.loc[order_book_id].to_records()
×
95

96

97
def gen_splits(d):
6✔
98
    stocks = rqdatac.all_instruments().order_book_id.tolist()
×
99
    split = rqdatac.get_split(stocks)
×
100
    split['split_factor'] = split['split_coefficient_to'] / split['split_coefficient_from']
×
101
    split = split[['split_factor']]
×
102
    split.reset_index(inplace=True)
×
103
    split.rename(columns={'ex_dividend_date': 'ex_date'}, inplace=True)
×
104
    split['ex_date'] = [convert_date_to_int(d) for d in split['ex_date']]
×
105
    split.set_index(['order_book_id', 'ex_date'], inplace=True)
×
106

107
    with h5py.File(os.path.join(d, 'split_factor.h5'), 'w') as h5:
×
108
        for order_book_id in split.index.levels[0]:
×
109
            h5[order_book_id] = split.loc[order_book_id].to_records()
×
110

111

112
def gen_ex_factor(d):
6✔
113
    stocks = rqdatac.all_instruments().order_book_id.tolist()
×
114
    ex_factor = rqdatac.get_ex_factor(stocks)
×
115
    ex_factor.reset_index(inplace=True)
×
116
    ex_factor['ex_date'] = [convert_date_to_int(d) for d in ex_factor['ex_date']]
×
117
    ex_factor.rename(columns={'ex_date': 'start_date'}, inplace=True)
×
118
    ex_factor.set_index(['order_book_id', 'start_date'], inplace=True)
×
119
    ex_factor = ex_factor[['ex_cum_factor']]
×
120

121
    dtype = ex_factor.loc[ex_factor.index.levels[0][0]].to_records().dtype
×
122
    initial = np.empty((1,), dtype=dtype)
×
123
    initial['start_date'] = 0
×
124
    initial['ex_cum_factor'] = 1.0
×
125

126
    with h5py.File(os.path.join(d, 'ex_cum_factor.h5'), 'w') as h5:
×
127
        for order_book_id in ex_factor.index.levels[0]:
×
128
            h5[order_book_id] = np.concatenate([initial, ex_factor.loc[order_book_id].to_records()])
×
129

130

131
def gen_share_transformation(d):
6✔
132
    df = rqdatac.get_share_transformation()
×
133
    df.drop_duplicates("predecessor", inplace=True)
×
134
    df.set_index('predecessor', inplace=True)
×
135
    df.effective_date = df.effective_date.astype(str)
×
136
    df.predecessor_delisted_date = df.predecessor_delisted_date.astype(str)
×
137

138
    json_file = os.path.join(d, 'share_transformation.json')
×
139
    with open(json_file, 'w') as f:
×
140
        f.write(df.to_json(orient='index'))
×
141

142

143
def gen_future_info(d):
6✔
144
    future_info_file = os.path.join(d, 'future_info.json')
×
145

146
    def _need_to_recreate():
×
147
        if not os.path.exists(future_info_file):
×
148
            return
×
149
        else:
150
            with open(future_info_file, "r") as f:
×
151
                all_futures_info = json.load(f)
×
152
                if "margin_rate" not in all_futures_info[0]:
×
153
                    return True
×
154
    
155
    def update_margin_rate(file):
×
156
        all_instruments_data = rqdatac.all_instruments("Future")
×
157
        with open(file, "r") as f:
×
158
            all_futures_info = json.load(f)
×
159
            new_all_futures_info = []
×
160
            for future_info in all_futures_info:
×
161
                if "order_book_id" in future_info:
×
162
                    future_info["margin_rate"] = all_instruments_data[all_instruments_data["order_book_id"] == future_info["order_book_id"]].iloc[0].margin_rate
×
163
                elif "underlying_symbol" in future_info:
×
164
                    dominant = rqdatac.futures.get_dominant(future_info["underlying_symbol"])[-1]
×
165
                    future_info["margin_rate"] = all_instruments_data[all_instruments_data["order_book_id"] == dominant].iloc[0].margin_rate
×
166
                new_all_futures_info.append(future_info)
×
167
        os.remove(file)
×
168
        with open(file, "w") as f:
×
169
            json.dump(new_all_futures_info, f, separators=(',', ':'), indent=2)
×
170

171
    if (_need_to_recreate()): update_margin_rate(future_info_file)
×
172

173
    # 新增 hard_code 的种类时,需要同时修改rqalpha.data.base_data_source.storages.FutureInfoStore.data_compatible中的内容
174
    hard_code = [
×
175
        {'underlying_symbol': 'TC',
176
          'close_commission_ratio': 4.0,
177
          'close_commission_today_ratio': 0.0,
178
          'commission_type': "by_volume",
179
          'open_commission_ratio': 4.0,
180
          'margin_rate': 0.05,
181
          'tick_size': 0.2},
182
         {'underlying_symbol': 'ER',
183
          'close_commission_ratio': 2.5,
184
          'close_commission_today_ratio': 2.5,
185
          'commission_type': "by_volume",
186
          'open_commission_ratio': 2.5,
187
          'margin_rate': 0.05,
188
          'tick_size': 1.0},
189
         {'underlying_symbol': 'WS',
190
          'close_commission_ratio': 2.5,
191
          'close_commission_today_ratio': 0.0,
192
          'commission_type': "by_volume",
193
          'open_commission_ratio': 2.5,
194
          'margin_rate': 0.05,
195
          'tick_size': 1.0},
196
         {'underlying_symbol': 'RO',
197
          'close_commission_ratio': 2.5,
198
          'close_commission_today_ratio': 0.0,
199
          'commission_type': "by_volume",
200
          'open_commission_ratio': 2.5,
201
          'margin_rate': 0.05,
202
          'tick_size': 2.0},
203
         {'underlying_symbol': 'ME',
204
          'close_commission_ratio': 1.4,
205
          'close_commission_today_ratio': 0.0,
206
          'commission_type': "by_volume",
207
          'open_commission_ratio': 1.4,
208
          'margin_rate': 0.06,
209
          'tick_size': 1.0},
210
        {'underlying_symbol': 'WT',
211
         'close_commission_ratio': 5.0,
212
         'close_commission_today_ratio': 5.0,
213
         'commission_type': "by_volume",
214
         'open_commission_ratio': 5.0,
215
         'margin_rate': 0.05,
216
         'tick_size': 1.0},
217
    ]
218

219
    if not os.path.exists(future_info_file):
×
220
        all_futures_info = hard_code
×
221
    else:
222
        with open(future_info_file, 'r') as f:
×
223
            all_futures_info = json.load(f)
×
224

225
    future_list = []
×
226
    symbol_list = []
×
227
    param = ['close_commission_ratio', 'close_commission_today_ratio', 'commission_type', 'open_commission_ratio']
×
228

229
    for i in all_futures_info:
×
230
        if i.get('order_book_id'):
×
231
            future_list.append(i.get('order_book_id'))
×
232
        else:
233
            symbol_list.append(i.get('underlying_symbol'))
×
234

235
    # 当修改了hard_code以后,避免用户需要手动删除future_info.json文件
236
    for info in hard_code:
×
237
        if info["underlying_symbol"] not in symbol_list:
×
238
            all_futures_info.append(info)
×
239
            symbol_list.append(info["underlying_symbol"])
×
240

241
    futures_order_book_id = rqdatac.all_instruments(type='Future')['order_book_id'].unique()
×
242
    commission_df = rqdatac.futures.get_commission_margin()
×
243
    for future in futures_order_book_id:
×
244
        underlying_symbol = re.match(r'^[a-zA-Z]*', future).group()
×
245
        if future in future_list:
×
246
            continue
×
247
        future_dict = {}
×
248
        commission = commission_df[commission_df['order_book_id'] == future]
×
249
        if not commission.empty:
×
250
            future_dict['order_book_id'] = future
×
251
            commission = commission.iloc[0]
×
252
            for p in param:
×
253
                future_dict[p] = commission[p]
×
254
            instruemnts_data = rqdatac.instruments(future)
×
255
            future_dict['margin_rate'] = instruemnts_data.margin_rate
×
256
            future_dict['tick_size'] = instruemnts_data.tick_size()
×
257
        elif underlying_symbol in symbol_list:
×
258
            continue
×
259
        else:
260
            symbol_list.append(underlying_symbol)
×
261
            future_dict['underlying_symbol'] = underlying_symbol
×
262
            try:
×
263
                dominant = rqdatac.futures.get_dominant(underlying_symbol).iloc[-1]
×
264
            except AttributeError:
×
265
                # FIXME: why get_dominant return None???
266
                continue
×
267
            commission = commission_df[commission_df['order_book_id'] == dominant].iloc[0]
×
268

269
            for p in param:
×
270
                future_dict[p] = commission[p]
×
271
            instruemnts_data = rqdatac.instruments(dominant)
×
272
            future_dict['margin_rate'] = instruemnts_data.margin_rate
×
273
            future_dict['tick_size'] = instruemnts_data.tick_size()
×
274
        all_futures_info.append(future_dict)
×
275

276
    with open(os.path.join(d, 'future_info.json'), 'w') as f:
×
277
        json.dump(all_futures_info, f, separators=(',', ':'), indent=2)
×
278

279

280
class GenerateFileTask(ProgressedTask):
6✔
281
    def __init__(self, func):
6✔
282
        self._func = func
×
283
        self._step = 100
×
284

285
    @property
6✔
286
    def total_steps(self):
5✔
287
        # type: () -> int
288
        return self._step
×
289

290
    def __call__(self, *args, **kwargs):
6✔
291
        self._func(*args, **kwargs)
×
292
        yield self._step
×
293

294

295
STOCK_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'limit_up', 'limit_down', 'volume', 'total_turnover']
6✔
296
INDEX_FIELDS = ['open', 'close', 'high', 'low', 'prev_close', 'volume', 'total_turnover']
6✔
297
FUTURES_FIELDS = STOCK_FIELDS + ['settlement', 'prev_settlement', 'open_interest']
6✔
298
FUND_FIELDS = STOCK_FIELDS
6✔
299

300

301
class DayBarTask(ProgressedTask):
6✔
302
    def __init__(self, order_book_ids):
6✔
303
        self._order_book_ids = order_book_ids
×
304

305
    @property
6✔
306
    def total_steps(self):
5✔
307
        # type: () -> int
308
        return len(self._order_book_ids)
×
309

310
    def __call__(self, path, fields, **kwargs):
6✔
311
        raise NotImplementedError
×
312

313

314
class GenerateDayBarTask(DayBarTask):
6✔
315
    def __call__(self, path, fields, **kwargs):
6✔
316
        with h5py.File(path, 'w') as h5:
×
317
            i, step = 0, 300
×
318
            while True:
319
                order_book_ids = self._order_book_ids[i:i + step]
×
320
                df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
×
321
                                    adjust_type='none', fields=fields, expect_df=True)
322
                if not (df is None or df.empty):
×
323
                    df.reset_index(inplace=True)
×
324
                    df['datetime'] = [convert_date_to_int(d) for d in df['date']]
×
325
                    del df['date']
×
326
                    df.set_index(['order_book_id', 'datetime'], inplace=True)
×
327
                    df.sort_index(inplace=True)
×
328
                    for order_book_id in df.index.levels[0]:
×
329
                        h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
×
330
                i += step
×
331
                yield len(order_book_ids)
×
332
                if i >= len(self._order_book_ids):
×
333
                    break
×
334

335

336
class UpdateDayBarTask(DayBarTask):
6✔
337
    def h5_has_valid_fields(self, h5, wanted_fields):
6✔
338
        obid_gen = (k for k in h5.keys())
×
339
        wanted_fields = set(wanted_fields)
×
340
        wanted_fields.add('datetime')
×
341
        try:
×
342
            h5_fields = set(h5[next(obid_gen)].dtype.fields.keys())
×
343
        except StopIteration:
×
344
            pass
×
345
        else:
346
            return h5_fields == wanted_fields
×
347
        return False
×
348

349
    def __call__(self, path, fields, **kwargs):
6✔
350
        need_recreate_h5 = False
×
351
        try:
×
352
            with h5py.File(path, 'r') as h5:
×
353
                need_recreate_h5 = not self.h5_has_valid_fields(h5, fields)
×
354
        except (OSError, RuntimeError):
×
355
            need_recreate_h5 = True
×
356
        if need_recreate_h5:
×
357
            yield from GenerateDayBarTask(self._order_book_ids)(path, fields, **kwargs)
×
358
        else:
NEW
359
            h5 = None
×
360
            try:
×
361
                h5 = h5py.File(path, 'a')
×
362
            except OSError:
×
363
                system_log.error("File {} update failed, if it is using, please update later, "
×
364
                                "or you can delete then update again".format(path))
NEW
365
                sval.value = 1
×
UNCOV
366
                yield 1
×
367
            else:
368
                is_futures = "futures" == os.path.basename(path).split(".")[0]
×
369
                for order_book_id in self._order_book_ids:
×
370
                    # 特殊处理前复权合约,需要全量更新
371
                    is_pre = is_futures and "888" in order_book_id
×
372
                    if order_book_id in h5 and not is_pre:
×
373
                        try:
×
374
                            last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
×
375
                        except OSError:
×
376
                            system_log.error("File {} update failed, if it is using, please update later, "
×
377
                                            "or you can delete then update again".format(path))
NEW
378
                            sval.value = 1
×
379
                            yield 1
×
380
                            break
×
381
                        except ValueError:
×
382
                            h5.pop(order_book_id)
×
383
                            start_date = START_DATE
×
384
                        else:
385
                            start_date = rqdatac.get_next_trading_date(last_date)
×
386
                    else:
387
                        start_date = START_DATE
×
388
                    df = rqdatac.get_price(order_book_id, start_date, END_DATE, '1d',
×
389
                                        adjust_type='none', fields=fields, expect_df=True)
390
                    if not (df is None or df.empty):
×
391
                        df = df[fields]  # Future order_book_id like SC888 will auto add 'dominant_id'
×
392
                        df = df.loc[order_book_id]
×
393
                        df.reset_index(inplace=True)
×
394
                        df['datetime'] = [convert_date_to_int(d) for d in df['date']]
×
395
                        del df['date']
×
396
                        df.set_index('datetime', inplace=True)
×
397
                        if order_book_id in h5:
×
398
                            data = np.array(
×
399
                                [tuple(i) for i in chain(h5[order_book_id][:], df.to_records())],
400
                                dtype=h5[order_book_id].dtype
401
                            )
402
                            del h5[order_book_id]
×
403
                            h5.create_dataset(order_book_id, data=data, **kwargs)
×
404
                        else:
405
                            h5.create_dataset(order_book_id, data=df.to_records(), **kwargs)
×
406
                    yield 1
×
407
            finally:
NEW
408
                if h5:
×
NEW
409
                    h5.close()
×
410

411

412
def process_init(args: Optional[Synchronized] = None):
6✔
413
    import warnings
×
414
    with warnings.catch_warnings(record=True):
×
415
        # catch warning: rqdatac is already inited. Settings will be changed
416
        rqdatac.init()
×
UNCOV
417
    init_logger()
×
418
    # Initialize process shared variables
NEW
419
    if args:
×
420
        global sval
NEW
421
        sval = args
×
422

423

424
def update_bundle(path, create, enable_compression=False, concurrency=1):
6✔
425
    if create:
×
426
        _DayBarTask = GenerateDayBarTask
×
427
    else:
428
        _DayBarTask = UpdateDayBarTask
×
429

430
    init_logger()
×
431
    kwargs = {}
×
432
    if enable_compression:
×
433
        kwargs['compression'] = 9
×
434

435
    day_bar_args = (
×
436
        ("stocks.h5", rqdatac.all_instruments('CS').order_book_id.tolist(), STOCK_FIELDS),
437
        ("indexes.h5", rqdatac.all_instruments('INDX').order_book_id.tolist(), INDEX_FIELDS),
438
        ("futures.h5", rqdatac.all_instruments('Future').order_book_id.tolist(), FUTURES_FIELDS),
439
        ("funds.h5", rqdatac.all_instruments('FUND').order_book_id.tolist(), FUND_FIELDS),
440
    )
441

442
    rqdatac.reset()
×
443

444
    gen_file_funcs = (
×
445
        gen_instruments, gen_trading_dates, gen_dividends, gen_splits, gen_ex_factor, gen_st_days,
446
        gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info
447
    )
448

NEW
449
    status_code = multiprocessing.Value("i", 0)
×
UNCOV
450
    with ProgressedProcessPoolExecutor(
×
451
            max_workers=concurrency, initializer=process_init, initargs=(status_code, )
452
    ) as executor:
453
        # windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
454
        for func in gen_file_funcs:
×
455
            executor.submit(GenerateFileTask(func), path)
×
456
        for file, order_book_id, field in day_bar_args:
×
457
            executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs)
×
NEW
458
    return status_code.value
×
459

460

461
class AutomaticUpdateBundle(object):
6✔
462
    def __init__(self, path: str, filename: str, api: Callable, fields: List[str], end_date: datetime.date, start_date: Union[int, datetime.date] = START_DATE) -> None:
6✔
463
        if not os.path.exists(path):
6✔
464
            os.makedirs(path)
6✔
465
        self._file = os.path.join(path, filename)
6✔
466
        self._trading_dates = None
6✔
467
        self._filename = filename
6✔
468
        self._api = api
6✔
469
        self._fields = fields
6✔
470
        self._start_date = start_date
6✔
471
        self._end_date = end_date
6✔
472
        self.updated = []
6✔
473
        self._env = Environment.get_instance()
6✔
474
        self._file_lock = FileLock(self._file + ".lock")
6✔
475

476
    def get_data(self, instrument: Instrument, dt: datetime.date) -> Optional[np.ndarray]:
6✔
477
        dt = convert_date_to_date_int(dt)
6✔
478
        data = self._get_data_all_time(instrument)
6✔
479
        if data is None:
6✔
480
            return data
×
481
        else:
482
            try:
6✔
483
                data = data[np.searchsorted(data['trading_dt'], dt)]
6✔
484
            except IndexError:
×
485
                data = None
×
486
            return data
6✔
487

488
    @lru_cache(128)
6✔
489
    def _get_data_all_time(self, instrument: Instrument) -> Optional[np.ndarray]:
6✔
490
        if instrument.order_book_id not in self.updated:
6✔
491
            self._auto_update_task(instrument)
6✔
492
            self.updated.append(instrument.order_book_id)
6✔
493
        with h5py.File(self._file, "r") as h5:
6✔
494
            data = h5[instrument.order_book_id][:]
6✔
495
            if len(data) == 0:
6✔
496
                return None
×
497
        return data
6✔
498
    
499
    def _auto_update_task(self, instrument: Instrument) -> None:
6✔
500
        """
501
        在 rqalpha 策略运行过程中自动更新所需的日线数据
502

503
        :param instrument: 合约对象
504
        :type instrument: `Instrument`
505
        """
506
        order_book_id = instrument.order_book_id
6✔
507
        start_date = self._start_date
6✔
508
        try:
6✔
509
            with self._file_lock.acquire():
6✔
510
                h5 = h5py.File(self._file, "a")
6✔
511
                if order_book_id in h5 and h5[order_book_id].dtype.names:
6✔
512
                    if 'trading_dt' in h5[order_book_id].dtype.names:
×
513
                        # 需要兼容此前的旧版数据,对字段名进行更新
514
                        if len(h5[order_book_id][:]) != 0:
×
515
                            last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date()
×
516
                            if last_date >= self._end_date:
×
517
                                return
×
518
                            start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date()
×
519
                            if start_date > self._end_date:
×
520
                                return
×
521
                    else:
522
                        del h5[order_book_id]
×
523
                
524
                arr = self._get_array(instrument, start_date)
6✔
525
                if arr is None:
6✔
526
                    if order_book_id not in h5:
×
527
                        arr = np.array([])
×
528
                        h5.create_dataset(order_book_id, data=arr)
×
529
                else:
530
                    if order_book_id in h5:
6✔
531
                        data = np.array(
×
532
                            [tuple(i) for i in chain(h5[order_book_id][:], arr)],
533
                            dtype=h5[order_book_id].dtype)
534
                        del h5[order_book_id]
×
535
                        h5.create_dataset(order_book_id, data=data)
×
536
                    else:
537
                        h5.create_dataset(order_book_id, data=arr)
6✔
538
        except (OSError, Timeout) as e:
×
539
            raise OSError(_("File {} update failed, if it is using, please update later, "
×
540
                          "or you can delete then update again".format(self._file))) from e
541
        finally:
542
            h5.close()
6✔
543
    
544
    def _get_array(self, instrument: Instrument, start_date: datetime.date) -> Optional[np.ndarray]:
6✔
545
        df = self._api(instrument.order_book_id, start_date, self._end_date, self._fields)
6✔
546
        if not (df is None or df.empty):
6✔
547
            df = df[self._fields].loc[instrument.order_book_id] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
6✔
548
            record = df.iloc[0: 1].to_records()
6✔
549
            dtype = [('trading_dt', 'int')]
6✔
550
            for field in self._fields:
6✔
551
                dtype.append((field, record.dtype[field]))
6✔
552
            trading_dt = self._env.data_proxy._data_source.batch_get_trading_date(df.index)
6✔
553
            trading_dt = convert_date_to_date_int(trading_dt)
6✔
554
            arr = np.ones((trading_dt.shape[0], ), dtype=dtype)
6✔
555
            arr['trading_dt'] = trading_dt
6✔
556
            for field in self._fields:
6✔
557
                arr[field] = df[field].values
6✔
558
            return arr
6✔
559
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc