Skip to content

Commit

Permalink
更新bundle数据支持将错误统一输出
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin-Dongzhao committed Aug 14, 2024
1 parent 762bc15 commit a7e6d69
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
from rqalpha.utils.i18n import gettext as _
from rqalpha.utils.functools import lru_cache
from rqalpha.utils.logger import init_logger, system_log
from rqalpha.environment import Environment
from rqalpha.model.instrument import Instrument

Expand Down Expand Up @@ -310,24 +311,29 @@ def __call__(self, path, fields, **kwargs):

class GenerateDayBarTask(DayBarTask):
def __call__(self, path, fields, **kwargs):
with h5py.File(path, 'w') as h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break
try:
with h5py.File(path, 'w') as h5:
i, step = 0, 300
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
del df['date']
df.set_index(['order_book_id', 'datetime'], inplace=True)
df.sort_index(inplace=True)
for order_book_id in df.index.levels[0]:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs)
i += step
yield len(order_book_ids)
if i >= len(self._order_book_ids):
break
except OSError:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1


class UpdateDayBarTask(DayBarTask):
Expand Down Expand Up @@ -356,9 +362,10 @@ def __call__(self, path, fields, **kwargs):
try:
h5 = h5py.File(path, 'a')
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
try:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1
else:
is_futures = "futures" == os.path.basename(path).split(".")[0]
for order_book_id in self._order_book_ids:
# 特殊处理前复权合约,需要全量更新
Expand All @@ -367,8 +374,10 @@ def __call__(self, path, fields, **kwargs):
try:
last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1
break
except ValueError:
h5.pop(order_book_id)
start_date = START_DATE
Expand Down Expand Up @@ -406,12 +415,18 @@ def init_rqdatac_with_warnings_catch():
rqdatac.init()


def process_init_func():
init_rqdatac_with_warnings_catch()
init_logger()


def update_bundle(path, create, enable_compression=False, concurrency=1):
if create:
_DayBarTask = GenerateDayBarTask
else:
_DayBarTask = UpdateDayBarTask

init_logger()
kwargs = {}
if enable_compression:
kwargs['compression'] = 9
Expand All @@ -431,7 +446,7 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
)

with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=init_rqdatac_with_warnings_catch
max_workers=concurrency, initializer=process_init_func
) as executor:
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
Expand Down

0 comments on commit a7e6d69

Please # to comment.