@ -119,12 +119,15 @@ class DDBHFTLoader(DDBLoader):
# 不能重复创建Pool对象, 因此需要在循环的最外侧创建好Pool对象, 然后传参进去
with Pool ( self . num_workers if num_workers is None else num_workers ) as pool :
for hft_type_name in self . hft_type_list :
print ( ' Will work on hft type: ' , hft_type_name )
with tqdm ( stock_list ) as pbar :
for stock_id in pbar :
pbar . set_description ( f " Working on stock { stock_id } " )
self . dump_hft_to_ddb ( hft_type_name , stock_id , pbar = pbar , pool = pool )
# Always reuse the connection object, to reduce the memory consumption.
with self . mssql_engine . connect ( ) as conn :
# Loop through the stock list.
for hft_type_name in self . hft_type_list :
print ( ' Will work on hft type: ' , hft_type_name )
with tqdm ( stock_list ) as pbar :
for stock_id in pbar :
pbar . set_description ( f " Working on stock { stock_id } " )
self . dump_hft_to_ddb ( hft_type_name , stock_id , conn , pbar = pbar , pool = pool )
def _get_stock_date_list ( self , cache = False ) :
@ -354,7 +357,7 @@ class DDBHFTLoader(DDBLoader):
print ( ' - ' * 80 )
def dump_hft_to_ddb ( self , type_name , stock_id , trade_date= None , pbar = None , pool = None ) :
def dump_hft_to_ddb ( self , type_name , stock_id , conn, trade_date= None , pbar = None , pool = None ) :
if ( type_name , stock_id , ' OK ' ) in self . dump_journal_df . index :
message = f " Will skip ( { type_name } , { stock_id } ) as it appears in the dump journal. "
if pbar is None :
@ -376,53 +379,52 @@ class DDBHFTLoader(DDBLoader):
# 经过尝试, 按个股来做batch查询效率还是可以接受的
# mssql中, 索引字段是(S_INFO_WINDCODE, TRADE_DT)
with self . mssql_engine . connect ( ) as conn :
stat = """
select * from [ Level2Bytes { mssql_type_name } ] . dbo . [ { mssql_type_name } ]
where S_INFO_WINDCODE = ' {stock_id} '
""" .format(
mssql_type_name = self . mssql_name_dict [ type_name ] ,
stock_id = stock_id
)
row_list = list ( conn . execute ( stat ) . fetchall ( ) )
# 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期
# 这里只把日期值不再`_journal_dt`的记录放入`row_list`
if _journal_dt is not None :
row_list = [ row for row in row_list
if pd . to_datetime ( row [ 1 ] ) not in _journal_dt . index ]
print ( f " Resume job for { stock_id } , with { len ( row_list ) } rows left. " )
num_rows = len ( row_list )
# 如果行数为0, 则说明是空数据, 可以直接返回
if num_rows == 0 :
return
if pbar :
#pbar.set_description(f"Did get the result set for stock {stock_id} from mssql")
pbar . set_description ( f " Will work in paralle on dumping job on { stock_id } of len { num_rows } " )
else :
print ( f " Did get the result set for stock { stock_id } from mssql " )
# 每一行是当个个股某一日的所有高频交易信息
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None :
print ( " Will create new Pool object, but this is not encourage for large batch work. " )
pool = Pool ( self . num_worker )
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm ( total = num_rows , leave = False ) as sub_pbar :
for _ in pool . imap_unordered (
functools . partial (
DDBHFTLoader . dump_stock_daily_to_ddb ,
type_name = type_name ,
stock_id = stock_id
) ,
row_list
) :
sub_pbar . update ( )
stat = """
select * from [ Level2Bytes { mssql_type_name } ] . dbo . [ { mssql_type_name } ]
where S_INFO_WINDCODE = ' {stock_id} '
""" .format(
mssql_type_name = self . mssql_name_dict [ type_name ] ,
stock_id = stock_id
)
row_list = list ( conn . execute ( stat ) . fetchall ( ) )
# 如果`_journal_dt`不为空,则说明之前的日志中表明改股票数据已经部分完成,需要逐个核对日期
# 这里只把日期值不再`_journal_dt`的记录放入`row_list`
if _journal_dt is not None :
row_list = [ row for row in row_list
if pd . to_datetime ( row [ 1 ] ) not in _journal_dt . index ]
print ( f " Resume job for { stock_id } , with { len ( row_list ) } rows left. " )
num_rows = len ( row_list )
# 如果行数为0, 则说明是空数据, 可以直接返回
if num_rows == 0 :
return
if pbar :
#pbar.set_description(f"Did get the result set for stock {stock_id} from mssql")
pbar . set_description ( f " Will work in paralle on dumping job on { stock_id } of len { num_rows } " )
else :
print ( f " Did get the result set for stock { stock_id } from mssql " )
# 每一行是当个个股某一日的所有高频交易信息
# 使用多进程来加快速度
#with Pool(self.num_workers if num_workers is None else num_workers) as pool:
if pool is None :
print ( " Will create new Pool object, but this is not encourage for large batch work. " )
pool = Pool ( self . num_worker )
# 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
with tqdm ( total = num_rows , leave = False ) as sub_pbar :
for _ in pool . imap_unordered (
functools . partial (
DDBHFTLoader . dump_stock_daily_to_ddb ,
type_name = type_name ,
stock_id = stock_id
) ,
row_list
) :
sub_pbar . update ( )
self . dump_journal_writer . write ( f " { type_name } , { stock_id } ,OK \n " )
self . dump_journal_writer . flush ( )