From 893603fafcb170692b8a5a1824fcf5787802fe5d Mon Sep 17 00:00:00 2001
From: Guofu Li <li.guofu.l@gmail.com>
Date: Sun, 28 Aug 2022 19:59:49 +0800
Subject: [PATCH] Bug fixes: create python processor pool dynamically, which
 solves the issue of memory leak for good.

---
 src/loader/DDBHFTLoader.py | 40 ++++++++++++++++++++++----------------
 1 file changed, 23 insertions(+), 17 deletions(-)

diff --git a/src/loader/DDBHFTLoader.py b/src/loader/DDBHFTLoader.py
index 5360ea4..641acd3 100644
--- a/src/loader/DDBHFTLoader.py
+++ b/src/loader/DDBHFTLoader.py
@@ -96,8 +96,7 @@ class DDBHFTLoader(DDBLoader):
     default_table_capacity = 10000
     # TODO: 这里需要饮用自身文件的绝对路径,然后再寻找其父目录
     ddb_dump_journal_fname = '../assets/ddb_dump_journal.csv'
-
-    ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers)
+    #ddb_sess_pool = ddb.DBConnectionPool(DDBLoader.ddb_config['host'], 8848, num_workers)
 
     def init_ddb_database(self, df_calendar):
         """
@@ -120,16 +119,16 @@ class DDBHFTLoader(DDBLoader):
         stock_list = df_calendar['code'].unique().astype('str')
 
         # 不能重复创建Pool对象,因此需要在循环的最外侧创建好Pool对象,然后传参进去
-        with Pool(self.num_workers if num_workers is None else num_workers) as pool:
-            # Always reuse the connection object, to reduce the memory consumption. 
-            with self.mssql_engine.connect() as conn:
-                # Loop through the stock list.
-                for hft_type_name in self.hft_type_list:
-                    print('Will work on hft type:', hft_type_name)
-                    with tqdm(stock_list) as pbar:
-                        for stock_id in pbar:
-                            pbar.set_description(f"Working on stock {stock_id}")
-                            self.dump_hft_to_ddb(hft_type_name, stock_id, conn, pbar=pbar, pool=pool)
+        #with Pool(self.num_workers if num_workers is None else num_workers) as pool:
+        # Always reuse the connection object, to reduce the memory consumption. 
+        with self.mssql_engine.connect() as conn:
+            # Loop through the stock list.
+            for hft_type_name in self.hft_type_list:
+                print('Will work on hft type:', hft_type_name)
+                with tqdm(stock_list) as pbar:
+                    for stock_id in pbar:
+                        pbar.set_description(f"Working on stock {stock_id}")
+                        self.dump_hft_to_ddb(hft_type_name, stock_id, conn, pbar=pbar)
 
 
     def _get_stock_date_list(self, cache=False):
@@ -359,7 +358,7 @@ class DDBHFTLoader(DDBLoader):
         print('-' * 80)
 
     
-    def dump_hft_to_ddb(self, type_name, stock_id, conn, trade_date=None, pbar=None, pool=None):
+    def dump_hft_to_ddb(self, type_name, stock_id, conn, trade_date=None, pbar=None):
         if (type_name, stock_id, 'OK') in self.dump_journal_df.index:
             message = f"Will skip ({type_name}, {stock_id}) as it appears in the dump journal."
             if pbar is None:
@@ -413,13 +412,14 @@ class DDBHFTLoader(DDBLoader):
         # 使用多进程来加快速度
         
         #with Pool(self.num_workers if num_workers is None else num_workers) as pool:
-        if pool is None:
-            print("Will create new Pool object, but this is not encourage for large batch work.")
-            pool = Pool(self.num_worker)
+        #if pool is None:
+        #    print("Will create new Pool object, but this is not encourage for large batch work.")
+        #    pool = Pool(self.num_worker)
 
+        py_proc_pool =  Pool(self.num_workers) 
         # 在单个股票内部,对不同日期进行并行处理,对内存使用较为友好,不需要同时载入多个股票海量的全历史数据
         with tqdm(total=num_rows, leave=False) as sub_pbar:
-            for _ in pool.imap_unordered(
+            for _ in py_proc_pool.imap_unordered(
                 functools.partial(
                     DDBHFTLoader.dump_stock_daily_to_ddb, 
                     type_name = type_name,
@@ -429,7 +429,12 @@ class DDBHFTLoader(DDBLoader):
             ):
                 sub_pbar.update()
 
+        # Always remember to close and join the pool mannally.
+        py_proc_pool.close()
+        py_proc_pool.join()
+
         del(row_list)
+
         self.dump_journal_writer.write(f"{type_name},{stock_id},OK\n")
         self.dump_journal_writer.flush()
 
@@ -506,5 +511,6 @@ class DDBHFTLoader(DDBLoader):
         ddb_sess.undefAll()
         ddb_sess.close()
         del(ddb_sess)
+        del(row)