Add qlib_loader.py with configurable date range for data loading

## New Files
- src/qlib_loader.py - Qlib data loader utility with:
  - load_data_from_handler() - Load data with configurable start/end dates
  - load_data_with_proc_list() - Full pipeline with preprocessing
  - load_and_dump_data() - Dump raw and processed data to pickle files
  - Fixed processor implementations (FixedDiff, FixedColumnRemover, etc.)
    that handle :: separator column format correctly
  - NaN filling workaround for con_rating_strength column

- config/handler.yaml - Modified handler config with <LOAD_START> and
  <LOAD_END> placeholders instead of hardcoded <SINCE_DATE> and <TODAY>

- data/.gitignore - Ignore pickle and parquet data files

## Updated
- README.md - Documentation for data loading with configurable date range

## Key Changes
1. Fixed Diff processor bug: Column names now correctly use :: separator
   format (e.g., 'feature_ext::log_size_diff') instead of malformed
   string representations of tuples

2. Preserved trained parameters: Fixed processors use mean_train/std_train
   from original proc_list pickle for RobustZScoreNorm

3. Configurable end date: handler.yaml now respects user-specified end
   dates instead of always loading until today

## Tested
- Successfully dumps raw data (before proc_list) to pickle files
- Successfully applies fixed proc_list and dumps processed data
- Both 2019-01 and 2025-01 data processed without errors
master
guofu 4 days ago
parent 49c9dae181
commit 586b16a6fa

@ -0,0 +1,229 @@
# Alpha158 0_7 vs 0_7_beta Prediction Comparison
This directory contains a workflow for comparing Alpha158 version 0_7 (original) vs 0_7_beta (enhanced with VAE embeddings) predictions.
## Overview
The goal is to evaluate whether the beta version of Alpha158 factors produces better predictions than the original 0_7 version when used with the d033 prediction model.
## Directory Structure
```
stock_1d/d033/alpha158_beta/
├── README.md # This file
├── config.yaml # VAE model configuration
├── pipeline.py # Main orchestration script
├── scripts/ # Core pipeline scripts
│ ├── generate_beta_embedding.py # Generate VAE embeddings from beta factors
│ ├── generate_returns.py # Generate actual returns from kline data
│ ├── fetch_predictions.py # Fetch original predictions from DolphinDB
│ ├── predict_with_embedding.py # Generate predictions using beta embeddings
│ └── compare_predictions.py # Compare 0_7 vs 0_7_beta predictions
├── src/ # Source modules
│ └── qlib_loader.py # Qlib data loader with configurable date range
├── config/ # Configuration files
│ └── handler.yaml # Modified handler with configurable end date
└── data/ # Data files (gitignored)
├── embedding_0_7_beta.parquet
├── predictions_beta_embedding.parquet
├── original_predictions_0_7.parquet
├── actual_returns.parquet
├── raw_data_*.pkl # Raw data before preprocessing
└── processed_data_*.pkl # Processed data after preprocessing
```
## Data Loading with Configurable Date Range
### handler.yaml Modification
The original `handler.yaml` uses `<TODAY>` placeholder which always loads data until today's date. The modified version in `config/handler.yaml` uses `<LOAD_END>` placeholder that can be controlled via arguments:
```yaml
# Original (always loads until today)
load_start: &load_start <SINCE_DATE>
load_end: &load_end <TODAY>
# Modified (configurable end date)
load_start: &load_start <LOAD_START>
load_end: &load_end <LOAD_END>
```
### Using qlib_loader.py
```python
from stock_1d.d033.alpha158_beta.src.qlib_loader import (
load_data_from_handler,
load_data_with_proc_list,
load_and_dump_data
)
# Load data with configurable date range
df = load_data_from_handler(
since_date="2019-01-01",
end_date="2019-01-31",
buffer_days=20, # Extra days for diff calculations
verbose=True
)
# Load and apply preprocessing pipeline
df_processed = load_data_with_proc_list(
since_date="2019-01-01",
end_date="2019-01-31",
proc_list_path="/path/to/proc_list.proc",
buffer_days=20
)
# Load and dump both raw and processed data to pickle files
raw_df, processed_df = load_and_dump_data(
since_date="2019-01-01",
end_date="2019-01-31",
output_dir="data/",
fill_con_rating_nan=True, # Fill NaN in con_rating_strength column
verbose=True
)
```
### Key Features
1. **Configurable end date**: Unlike the original handler.yaml, the end date is now respected
2. **Buffer period handling**: Automatically loads extra days before `since_date` for diff calculations
3. **NaN handling**: Optional filling of NaN values in `con_rating_strength` column
4. **Dual output**: Saves both raw (before proc_list) and processed (after proc_list) data
### Processor Fixes
The `qlib_loader.py` includes fixed implementations of qlib processors that correctly handle the `::` separator column format:
- `FixedDiff` - Fixes column naming bug (creates proper `feature::col_diff` names)
- `FixedColumnRemover` - Handles `::` separator format
- `FixedRobustZScoreNorm` - Uses trained `mean_train`/`std_train` parameters from pickle
- `FixedIndusNtrlInjector` - Industry neutralization with `::` format
- Other fixed processors for the full preprocessing pipeline
All fixed processors preserve the trained parameters from the original proc_list pickle.
## Workflow
### 1. Generate Beta Embeddings
Generate VAE embeddings from the alpha158_0_7_beta factors:
```bash
python scripts/generate_beta_embedding.py --start-date 2019-01-01 --end-date 2020-11-30
```
This loads data from Parquet, applies the full feature transformation pipeline, and encodes with the VAE model.
Output: `data/embedding_0_7_beta.parquet`
### 2. Fetch Original Predictions
Fetch the original 0_7 predictions from DolphinDB:
```bash
python scripts/fetch_predictions.py --start-date 2019-01-01 --end-date 2020-11-30
```
Output: `data/original_predictions_0_7.parquet`
### 3. Generate Predictions with Beta Embeddings
Use the d033 model to generate predictions from the beta embeddings:
```bash
python scripts/predict_with_embedding.py --start-date 2019-01-01 --end-date 2020-11-30
```
Output: `data/predictions_beta_embedding.parquet`
### 4. Generate Actual Returns
Generate actual returns from kline data for IC calculation:
```bash
python scripts/generate_returns.py
```
Output: `data/actual_returns.parquet`
### 5. Compare Predictions
Compare the 0_7 vs 0_7_beta predictions:
```bash
python scripts/compare_predictions.py
```
This calculates:
- Prediction correlation (Pearson and Spearman)
- Daily correlation statistics
- IC metrics (mean, std, IR)
- RankIC metrics
- Top-tier returns (top 10%)
## Quick Start
Run the full pipeline:
```bash
python pipeline.py --start-date 2019-01-01 --end-date 2020-11-30
```
Or run individual steps:
```bash
# Step 1: Generate embeddings
python scripts/generate_beta_embedding.py --start-date 2019-01-01 --end-date 2020-11-30
# Step 2: Fetch original predictions
python scripts/fetch_predictions.py --start-date 2019-01-01 --end-date 2020-11-30
# Step 3: Generate beta predictions
python scripts/predict_with_embedding.py
# Step 4: Generate returns
python scripts/generate_returns.py
# Step 5: Compare
python scripts/compare_predictions.py
```
## Data Dependencies
### Input Data (from Parquet)
- `/data/parquet/dataset/stg_1day_wind_alpha158_0_7_beta_1D/` - Alpha158 beta factors
- `/data/parquet/dataset/stg_1day_wind_kline_adjusted_1D/` - Market data (kline)
- `/data/parquet/dataset/stg_1day_gds_indus_flag_cc1_1D/` - Industry flags
### Models
- `/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/model/csiallx_feature2_ntrla_flag_pnlnorm_vae4_dim32a_beta0001/module.pt` - VAE encoder
- `/home/guofu/Workspaces/alpha/data_ops/tasks/app_longsignal/model/host140_exp20_d033/module.pt` - d033 prediction model
### DolphinDB
- Table: `dfs://daily_stock_run_multicast/app_1day_multicast_longsignal_port`
- Version: `host140_exp20_d033`
## Key Metrics
The comparison script outputs:
| Metric | Description |
|--------|-------------|
| Pearson Correlation | Overall correlation between 0_7 and beta predictions |
| Spearman Correlation | Rank correlation between predictions |
| Daily Correlation | Mean and std of daily correlations |
| IC Mean | Average information coefficient |
| IC Std | Standard deviation of IC |
| IC IR | Information ratio (IC Mean / IC Std) |
| RankIC | Spearman correlation with returns |
| Top-tier Return | Average return of top 10% predictions |
## Notes
- All scripts can be run from the `alpha158_beta/` directory
- Scripts use relative paths (`../data/`) to locate data files
- The VAE model expects 341 input features after the transformation pipeline
- The d033 model expects 32-dimensional embeddings with a 40-day lookback window

@ -0,0 +1,34 @@
qlib_init:
provider_uri: "/home/guofu/.qlib/data_ops/target"
region: cn
ddb_config: &ddb_config
host: 192.168.1.146
port: 8848
username: "admin"
password: "123456"
# Load date range - these placeholders should be replaced at runtime
# LOAD_START = since_date - buffer_days (e.g., 20 days for diff calculation)
# LOAD_END = end_date (user-specified, NOT today's date)
load_start: &load_start <LOAD_START>
load_end: &load_end <LOAD_END>
market: &market csiallx
data_handler_config: &data_handler_config
start_time: *load_start
end_time: *load_end
instruments: *market
ddb_config: *ddb_config
handler_list:
- <HANDLER_ALPHA158>
- <HANDLER_MARKET_EXT>
- <HANDLER_MARKET_FLAG>
- <HANDLER_INDUS_FLAG>
- <HANDLER_ST_FLAG>
handler:
class: AggHandler
module_path: qlib.contrib.data.agg_handler
kwargs: *data_handler_config

@ -0,0 +1,8 @@
# Ignore all pickle and parquet data files
*.pkl
*.parquet
*.npy
*.npz
# Keep the .gitignore itself
!.gitignore

@ -0,0 +1,876 @@
#!/usr/bin/env python
"""
Qlib Loader Utility - Load data using the gold-standard handler.yaml configuration.
This module provides a wrapper around qlib's AggHandler that allows specifying
both start and end dates for data loading, unlike the original handler.yaml
which always loads until today.
"""
import os
import sys
import pprint
import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import pandas as pd
from ruamel.yaml import YAML
# NumPy 2.0 compatibility: np.NaN was removed in NumPy 2.0
# This must be set BEFORE importing qlib modules that use np.NaN
import numpy as np
if not hasattr(np, 'NaN'):
np.NaN = np.nan
# Add qlib imports
import qlib
from qlib.utils import (
init_instance_by_config,
fill_placeholder
)
from qlib.contrib.utils import load_placehorder_from_module
# Path to the modified handler.yaml
# qlib_loader.py is at: stock_1d/d033/alpha158_beta/src/qlib_loader.py
# handler.yaml is at: stock_1d/d033/alpha158_beta/config/handler.yaml
CURRENT_DIR = Path(__file__).parent # src/
PROJECT_DIR = CURRENT_DIR.parent # alpha158_beta/
HANDLER_YAML_PATH = PROJECT_DIR / "config" / "handler.yaml"
# Original handler.yaml path (for reference)
ORIGINAL_HANDLER_YAML_PATH = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/handler.yaml"
def load_data_from_handler(
since_date: str,
end_date: str,
buffer_days: int = 20,
yaml_path: Optional[str] = None,
verbose: bool = True
) -> pd.DataFrame:
"""
Load data using qlib's AggHandler with configurable date range.
Args:
since_date: Start date for the data (YYYY-MM-DD or datetime-like)
end_date: End date for the data (YYYY-MM-DD or datetime-like)
buffer_days: Extra days to load before since_date for diff calculations (default: 20)
yaml_path: Path to handler.yaml (default: uses the modified version in config/)
verbose: Print debug information
Returns:
pd.DataFrame: Loaded data with MultiIndex (datetime, instrument)
Notes:
- The buffer_days is needed because the Diff processor calculates
period-over-period changes, which requires looking back in time.
- After loading, you should filter the result to [since_date, end_date]
to remove the buffer period data.
"""
# Resolve yaml path
if yaml_path is None:
yaml_path = HANDLER_YAML_PATH
yaml_path = Path(yaml_path)
if not yaml_path.exists():
raise FileNotFoundError(f"handler.yaml not found at {yaml_path}")
# Convert since_date to datetime if string
if isinstance(since_date, str):
since_date = pd.to_datetime(since_date)
if isinstance(end_date, str):
end_date = pd.to_datetime(end_date)
# Calculate load start (with buffer for diff calculations)
load_start = since_date - pd.Timedelta(days=buffer_days)
if verbose:
print("=" * 60)
print("Loading data from handler.yaml")
print("=" * 60)
print(f" Requested range: {since_date.date()} to {end_date.date()}")
print(f" Buffer days: {buffer_days}")
print(f" Actual load range: {load_start.date()} to {end_date.date()}")
print(f" Handler yaml: {yaml_path}")
# Load yaml config
yaml_loader = YAML(typ='safe', pure=True)
with open(yaml_path) as f:
config = yaml_loader.load(f)
# Initialize qlib
from qlib.workflow.cli import sys_config
config_path = "qlib.contrib.data.config"
sys_config(config, config_path)
qlib.init(**config.get("qlib_init"))
# Prepare placeholder values
placeholder_value = {
"<LOAD_START>": load_start,
"<LOAD_END>": end_date,
}
# Also load placeholders from handler module if available
try:
placeholder_value.update(
load_placehorder_from_module(config["handler"])
)
except Exception as e:
if verbose:
print(f" Note: Could not load placeholders from handler module: {e}")
# Fill placeholders in config
config = fill_placeholder(config, placeholder_value)
if verbose:
print("\nHandler config after filling placeholders:")
pprint.pprint(config)
# Initialize handler and load data
handler = init_instance_by_config(config["handler"])
# Return the underlying data
data = handler._data
if verbose:
# SepDataFrame doesn't have .shape, convert to DataFrame first
if hasattr(data, 'to_frame'):
data_df = data.to_frame() # Convert SepDataFrame to DataFrame
else:
data_df = data
print(f"\nLoaded data shape: {data_df.shape}")
print(f"Data index levels: {data_df.index.names}")
print(f"Data columns: {list(data_df.columns)[:20]}...")
# Filter to requested date range
print(f"\nFiltering to requested range: {since_date.date()} to {end_date.date()}")
# Filter to the requested date range (remove buffer period)
if isinstance(data.index, pd.MultiIndex):
data = data.loc(axis=0)[slice(since_date, end_date), :]
else:
data = data.loc[slice(since_date, end_date)]
if verbose:
# Again handle SepDataFrame
if hasattr(data, 'to_frame'):
data_df = data.to_frame()
else:
data_df = data
print(f"Filtered data shape: {data_df.shape}")
print("=" * 60)
return data
def load_data_with_proc_list(
since_date: str,
end_date: str,
proc_list_path: Optional[str] = None,
buffer_days: int = 20,
yaml_path: Optional[str] = None,
verbose: bool = True
) -> pd.DataFrame:
"""
Load data and apply the preprocessing pipeline (proc_list).
This is the full gold-standard pipeline that produces the exact features
the VAE was trained on.
Args:
since_date: Start date for the data (YYYY-MM-DD)
end_date: End date for the data (YYYY-MM-DD)
proc_list_path: Path to proc_list.proc file
buffer_days: Extra days to load before since_date
yaml_path: Path to handler.yaml
verbose: Print debug information
Returns:
pd.DataFrame: Preprocessed data
"""
import pickle as pkl
from qlib.contrib.data.utils import apply_proc_list
# Default proc_list path
if proc_list_path is None:
proc_list_path = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/proc_list.proc"
if verbose:
print("Step 1: Loading raw data from handler...")
# Load raw data
df = load_data_from_handler(
since_date=since_date,
end_date=end_date,
buffer_days=buffer_days,
yaml_path=yaml_path,
verbose=verbose
)
if verbose:
print("\nStep 2: Loading preprocessing pipeline (proc_list)...")
print(f" Path: {proc_list_path}")
# Load proc_list
with open(proc_list_path, "rb") as f:
proc_list = pkl.load(f)
if verbose:
print(f" Number of processors: {len(proc_list)}")
for i, proc in enumerate(proc_list):
print(f" [{i}] {type(proc).__name__}")
if verbose:
print("\nStep 3: Applying preprocessing pipeline...")
# Apply proc_list
# Note: with_fit=False because we use pre-fitted parameters
df_processed = apply_proc_list(df, proc_list=proc_list, with_fit=False)
if verbose:
print(f"\nProcessed data shape: {df_processed.shape}")
print("=" * 60)
return df_processed
def _fill_con_rating_nan(raw_data, verbose=True):
"""
Fill NaN values in con_rating_strength column before applying proc_list.
The Diff processor creates NaN values, and FlagMarketInjector fails when
trying to convert columns with NaN to int8. This function fills NaN in
con_rating_strength with the column median to avoid IntCastingNaNError.
Args:
raw_data: SepDataFrame or DataFrame with MultiIndex columns
verbose: Print debug info
Returns:
Same type as input, with NaN filled in con_rating_strength
"""
# Check if this is a SepDataFrame (qlib's separated DataFrame)
is_sep = hasattr(raw_data, 'to_frame') and type(raw_data).__name__ == 'SepDataFrame'
# Convert SepDataFrame to DataFrame if needed
if is_sep:
df = raw_data.to_frame()
else:
df = raw_data
# Check if con_rating_strength exists in feature_ext group
target_col = ('feature_ext', 'con_rating_strength')
if target_col in df.columns:
median_val = df[target_col].median()
nan_count = df[target_col].isna().sum()
if verbose:
print(f" Filling {nan_count} NaN values in con_rating_strength with median={median_val:.4f}")
# Create a copy and fill NaN
df = df.copy()
df[target_col] = df[target_col].fillna(median_val)
if verbose:
print(f" Verified: {df[target_col].isna().sum()} NaN remaining")
return df
if verbose:
print(" con_rating_strength not found, skipping NaN fill")
return raw_data
class FixedDiff:
"""
Fixed Diff processor that correctly handles :: separator column format.
The original qlib Diff processor has a bug where it creates column names like:
"('feature_ext', 'log_size')_diff" (string representation of tuple)
Instead of:
'feature_ext::log_size_diff' (proper :: separator format)
"""
def __init__(self, fields_group, suffix="diff", periods=1):
self.fields_group = fields_group
self.suffix = suffix
self.periods = periods
def __call__(self, df):
import pandas as pd
# Get columns for this group - handle :: separator format
cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
df_cols = df[cols]
cols_name = df_cols.columns
# Apply diff transformation
df_cols_diff = df_cols.groupby("instrument").transform(
lambda x: x.ffill().diff(self.periods).fillna(0.)
)
df_cols = pd.concat([df_cols, df_cols_diff], axis=1)
# Create new column names with suffix appended
new_cols = []
for name in cols_name:
new_cols.append(name)
new_cols.append(f"{name}_{self.suffix}")
df_cols.columns = new_cols
df[df_cols.columns] = df_cols
return df
class FixedColumnRemover:
"""Fixed ColumnRemover that handles :: separator format."""
def __init__(self, fields_group):
self.fields_group = fields_group
def __call__(self, df):
cols_to_remove = []
for item in self.fields_group:
if item in df.columns:
cols_to_remove.append(item)
return df.drop(columns=cols_to_remove, errors='ignore')
class FixedFlagToOnehot:
"""Fixed FlagToOnehot that handles :: separator format."""
def __init__(self, fields_group, onehot_group, format_compact=False):
self.fields_group = fields_group
self.onehot_group = onehot_group
self.format_compact = format_compact
def __call__(self, df):
import pandas as pd
cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
for col in cols:
industry_code = col.split('::')[1]
new_col = f"{self.onehot_group}::{industry_code}"
df[new_col] = df[col].astype(int)
return df
class FixedIndusNtrlInjector:
"""Fixed IndusNtrlInjector that handles :: separator format."""
def __init__(self, fields_group, input_group, indus_group,
indus_suffix="_ntrl", ntrl_suffix="_ntrl", keep_origin=True,
include_indus=False, include_indus_std=False, norm_by_ntrl=False):
self.fields_group = fields_group
self.input_group = input_group
self.indus_group = indus_group
self.indus_suffix = indus_suffix
self.ntrl_suffix = ntrl_suffix
self.keep_origin = keep_origin
self.include_indus = include_indus
self.include_indus_std = include_indus_std
self.norm_by_ntrl = norm_by_ntrl
def __call__(self, df):
import pandas as pd
import numpy as np
feature_cols = [c for c in df.columns if c.startswith(f"{self.input_group}::")]
indus_cols = [c for c in df.columns if c.startswith(f"{self.indus_group}::")]
# Get primary industry column (first one with any True values)
indus_assign = None
for ic in indus_cols:
if df[ic].any():
indus_assign = ic
break
if indus_assign is None:
return df
for feat_col in feature_cols:
feat_name = feat_col.split('::')[1]
grouped = df.groupby(indus_assign)[feat_col]
indus_mean = grouped.transform('mean')
indus_std = grouped.transform('std')
ntrl_col = f"{self.input_group}::{feat_name}{self.ntrl_suffix}"
df[ntrl_col] = (df[feat_col] - indus_mean) / indus_std.replace(0, np.nan)
return df
class FixedRobustZScoreNorm:
"""Fixed RobustZScoreNorm that handles :: separator format with trained params."""
def __init__(self, fields_group, mean_train, std_train, clip_outlier=True, cols=None):
self.fields_group = fields_group
self.mean_train = mean_train
self.std_train = std_train
self.clip_outlier = clip_outlier
self.cols = cols
def __call__(self, df):
import pandas as pd
import numpy as np
# Get columns to normalize
if isinstance(self.fields_group, list):
cols_to_norm = []
for grp in self.fields_group:
cols_to_norm.extend([c for c in df.columns if c.startswith(f"{grp}::")])
else:
cols_to_norm = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
# Apply normalization using trained mean/std
if self.mean_train is not None and self.std_train is not None:
for i, col in enumerate(cols_to_norm):
if i < len(self.mean_train) and i < len(self.std_train):
mean_val = self.mean_train[i]
std_val = self.std_train[i]
if std_val > 0:
df[col] = (df[col] - mean_val) / std_val
return df
class FixedFillna:
"""Fixed Fillna that handles :: separator format."""
def __init__(self, fields_group, fill_value=0):
self.fields_group = fields_group
self.fill_value = fill_value
def __call__(self, df):
if isinstance(self.fields_group, list):
cols_to_fill = []
for grp in self.fields_group:
cols_to_fill.extend([c for c in df.columns if c.startswith(f"{grp}::")])
else:
cols_to_fill = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
df[cols_to_fill] = df[cols_to_fill].fillna(self.fill_value)
return df
class FixedFlagMarketInjector:
"""Fixed FlagMarketInjector that handles :: separator format."""
def __init__(self, fields_group, vocab_size=2):
self.fields_group = fields_group
self.vocab_size = vocab_size
def __call__(self, df):
cols = [c for c in df.columns if c.startswith(f"{self.fields_group}::")]
for col in cols:
df[col] = df[col].astype('int8')
return df
class FixedFlagSTInjector:
"""Fixed FlagSTInjector that handles :: separator format."""
def __init__(self, fields_group, st_group="st_flag", col_name="IsST"):
self.fields_group = fields_group
self.st_group = st_group
self.col_name = col_name
def __call__(self, df):
cols = [c for c in df.columns if c.startswith(f"{self.st_group}::")]
for col in cols:
df[col] = df[col].astype('int8')
return df
def convert_columns_to_double_colon(df):
"""
Convert MultiIndex tuple columns to '::' separator string format.
This is needed because the proc_list was trained on data with column names like:
'feature_ext::log_size'
But our loader produces MultiIndex tuples:
('feature_ext', 'log_size')
Args:
df: DataFrame with MultiIndex tuple columns
Returns:
DataFrame with string columns using '::' separator
"""
if not isinstance(df.columns, pd.MultiIndex):
return df
# Create new column names with :: separator
new_columns = [f"{grp}::{col}" for grp, col in df.columns]
df_copy = df.copy()
df_copy.columns = new_columns
return df_copy
def convert_columns_from_double_colon(df):
"""
Convert '::' separator string columns back to MultiIndex tuple format.
Args:
df: DataFrame with '::' separator string columns
Returns:
DataFrame with MultiIndex tuple columns
"""
# Check if columns are strings with :: separator
if not isinstance(df.columns, pd.Index):
return df
# Check if any column contains ::
has_double_colon = any(isinstance(c, str) and '::' in c for c in df.columns)
if not has_double_colon:
return df
# Convert to MultiIndex
new_columns = [tuple(c.split('::', 1)) for c in df.columns]
df_copy = df.copy()
df_copy.columns = pd.MultiIndex.from_tuples(new_columns)
return df_copy
def load_and_dump_data(
since_date: str,
end_date: str,
output_dir: Optional[str] = None,
proc_list_path: Optional[str] = None,
buffer_days: int = 20,
yaml_path: Optional[str] = None,
verbose: bool = True,
skip_proc_list: bool = False,
fill_con_rating_nan: bool = True # New parameter
) -> tuple:
"""
Load data and dump both raw (before proc_list) and processed (after proc_list) versions.
This function saves:
1. Raw data from handler (before applying preprocessing pipeline)
2. Processed data (after applying proc_list) - if not skipped and if successful
Note: The proc_list may fail due to compatibility issues with the data. In this case,
only the raw data will be saved.
Args:
since_date: Start date for the data (YYYY-MM-DD)
end_date: End date for the data (YYYY-MM-DD)
output_dir: Output directory for pickle files (default: data/ folder)
proc_list_path: Path to proc_list.proc file
buffer_days: Extra days to load before since_date
yaml_path: Path to handler.yaml
verbose: Print debug information
skip_proc_list: If True, skip applying proc_list entirely
fill_con_rating_nan: If True, fill NaN in con_rating_strength before proc_list
Returns:
tuple: (raw_df, processed_df or None)
"""
import pickle as pkl
from qlib.contrib.data.utils import apply_proc_list
# Default output directory (data/ folder in project)
if output_dir is None:
output_dir = PROJECT_DIR / "data"
else:
output_dir = Path(output_dir)
# Default proc_list path
if proc_list_path is None:
proc_list_path = "/home/guofu/Workspaces/alpha/data_ops/tasks/dwm_feature_vae/dataset/csiallx_feature2_ntrla_flag_pnlnorm/proc_list.proc"
if verbose:
print("=" * 60)
print("Loading and dumping data")
print("=" * 60)
print(f" Output directory: {output_dir}")
print()
# Ensure output directory exists
output_dir.mkdir(parents=True, exist_ok=True)
# Convert dates
if isinstance(since_date, str):
since_date = pd.to_datetime(since_date)
if isinstance(end_date, str):
end_date = pd.to_datetime(end_date)
# Step 1: Load raw data from handler (with buffer period, NOT filtered)
if verbose:
print("Step 1: Loading raw data from handler (with buffer period)...")
print(f" Requested range: {since_date.date()} to {end_date.date()}")
print(f" Buffer days: {buffer_days}")
# Load yaml config
yaml_path = yaml_path or HANDLER_YAML_PATH
yaml_path = Path(yaml_path)
if not yaml_path.exists():
raise FileNotFoundError(f"handler.yaml not found at {yaml_path}")
yaml_loader = YAML(typ='safe', pure=True)
with open(yaml_path) as f:
config = yaml_loader.load(f)
# Initialize qlib
from qlib.workflow.cli import sys_config
config_path = "qlib.contrib.data.config"
sys_config(config, config_path)
qlib.init(**config.get("qlib_init"))
# Calculate load start (with buffer for diff calculations)
load_start = since_date - pd.Timedelta(days=buffer_days)
# Prepare placeholder values
placeholder_value = {
"<LOAD_START>": load_start,
"<LOAD_END>": end_date,
}
# Load placeholders from handler module
try:
placeholder_value.update(
load_placehorder_from_module(config["handler"])
)
except Exception as e:
if verbose:
print(f" Note: Could not load placeholders from handler module: {e}")
# Fill placeholders in config
config = fill_placeholder(config, placeholder_value)
# Initialize handler and load data
handler = init_instance_by_config(config["handler"])
raw_data = handler._data # Keep as SepDataFrame
if verbose:
if hasattr(raw_data, 'to_frame'):
tmp_df = raw_data.to_frame()
else:
tmp_df = raw_data
print(f" Loaded data shape (with buffer): {tmp_df.shape}")
print(f" Data index levels: {tmp_df.index.names}")
# Step 2: Dump raw data (before proc_list, filtered to requested range)
# Filter first for dumping
if isinstance(raw_data.index, pd.MultiIndex):
raw_data_filtered = raw_data.loc(axis=0)[slice(since_date, end_date), :]
else:
raw_data_filtered = raw_data.loc[slice(since_date, end_date)]
if hasattr(raw_data_filtered, 'to_frame'):
raw_df = raw_data_filtered.to_frame()
else:
raw_df = raw_data_filtered
raw_output_path = output_dir / f"raw_data_{since_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.pkl"
if verbose:
print(f"\nStep 2: Dumping raw data (filtered) to {raw_output_path}...")
print(f" Raw data shape (filtered): {raw_df.shape}")
with open(raw_output_path, "wb") as f:
pkl.dump(raw_df, f)
if verbose:
print(f" Saved: {raw_output_path}")
# Skip proc_list if requested
if skip_proc_list:
if verbose:
print("\nStep 3: Skipping proc_list as requested")
print()
print("=" * 60)
print("Summary:")
print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
print(f" Processed data: SKIPPED")
print("=" * 60)
return raw_df, None
# Step 3: Load preprocessing pipeline (proc_list)
if verbose:
print("\nStep 3: Loading preprocessing pipeline (proc_list)...")
print(f" Path: {proc_list_path}")
with open(proc_list_path, "rb") as f:
proc_list = pkl.load(f)
if verbose:
print(f" Number of processors: {len(proc_list)}")
for i, proc in enumerate(proc_list):
print(f" [{i}] {type(proc).__name__}")
# Step 4: Apply proc_list (BEFORE filtering, on data with buffer period)
if verbose:
print("\nStep 4: Applying preprocessing pipeline (on data with buffer)...")
# Convert SepDataFrame to DataFrame for processing
if hasattr(raw_data, 'to_frame'):
df_for_proc = raw_data.to_frame()
else:
df_for_proc = raw_data.copy()
# Fill NaN in con_rating_strength if requested (workaround for IntCastingNaNError)
if fill_con_rating_nan:
if verbose:
print(" Pre-processing: Filling NaN in con_rating_strength...")
df_for_proc = _fill_con_rating_nan(df_for_proc, verbose=verbose)
# Convert columns from MultiIndex tuples to :: separator format
# The proc_list was trained on data with 'feature_ext::log_size' format
df_for_proc = convert_columns_to_double_colon(df_for_proc)
if verbose:
print(f" Converted columns: {list(df_for_proc.columns[:5])}...")
# Replace Diff processor with FixedDiff - keep all other processors from pickle
# FixedDiff handles the :: separator format and fixes the column naming bug
fixed_proc_list = []
for proc in proc_list:
proc_name = type(proc).__name__
if proc_name == 'Diff':
if verbose:
print(f" Replacing Diff with FixedDiff (fields_group={proc.fields_group})")
fixed_proc = FixedDiff(
fields_group=proc.fields_group,
suffix=proc.suffix,
periods=proc.periods
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'FlagMarketInjector':
if verbose:
print(f" Replacing FlagMarketInjector with FixedFlagMarketInjector")
fixed_proc = FixedFlagMarketInjector(
fields_group=proc.fields_group,
vocab_size=getattr(proc, 'vocab_size', 2)
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'FlagSTInjector':
if verbose:
print(f" Replacing FlagSTInjector with FixedFlagSTInjector")
fixed_proc = FixedFlagSTInjector(
fields_group=proc.fields_group,
st_group=getattr(proc, 'st_group', 'st_flag'),
col_name=getattr(proc, 'col_name', 'IsST')
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'ColumnRemover':
if verbose:
print(f" Replacing ColumnRemover with FixedColumnRemover")
fixed_proc = FixedColumnRemover(fields_group=proc.fields_group)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'FlagToOnehot':
if verbose:
print(f" Replacing FlagToOnehot with FixedFlagToOnehot")
fixed_proc = FixedFlagToOnehot(
fields_group=proc.fields_group,
onehot_group=getattr(proc, 'onehot_group', 'industry')
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'IndusNtrlInjector':
if verbose:
print(f" Replacing IndusNtrlInjector with FixedIndusNtrlInjector (fields_group={proc.fields_group})")
fixed_proc = FixedIndusNtrlInjector(
fields_group=proc.fields_group,
input_group=getattr(proc, 'input_group', proc.fields_group),
indus_group=getattr(proc, 'indus_group', 'indus_flag'),
ntrl_suffix=getattr(proc, 'ntrl_suffix', '_ntrl')
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'RobustZScoreNorm':
if verbose:
print(f" Replacing RobustZScoreNorm with FixedRobustZScoreNorm (using trained mean/std)")
fixed_proc = FixedRobustZScoreNorm(
fields_group=proc.fields_group,
mean_train=getattr(proc, 'mean_train', None),
std_train=getattr(proc, 'std_train', None),
clip_outlier=getattr(proc, 'clip_outlier', True)
)
fixed_proc_list.append(fixed_proc)
elif proc_name == 'Fillna':
if verbose:
print(f" Replacing Fillna with FixedFillna")
fixed_proc = FixedFillna(
fields_group=proc.fields_group,
fill_value=getattr(proc, 'fill_value', 0)
)
fixed_proc_list.append(fixed_proc)
else:
# Keep original processor for unknown types
fixed_proc_list.append(proc)
try:
# Apply the fixed proc_list
if verbose:
print("\n Applying fixed preprocessing pipeline...")
processed_data = apply_proc_list(df_for_proc, proc_list=fixed_proc_list, with_fit=False)
if verbose:
print(f" Processed data shape: {processed_data.shape}")
print(f" Processed columns sample: {list(processed_data.columns[:5])}...")
# Convert columns back to MultiIndex tuples for consistency
processed_data = convert_columns_from_double_colon(processed_data)
# Now filter to requested date range
if isinstance(processed_data.index, pd.MultiIndex):
processed_data_filtered = processed_data.loc(axis=0)[slice(since_date, end_date), :]
else:
processed_data_filtered = processed_data.loc[slice(since_date, end_date)]
if verbose:
print(f" Processed data shape (filtered): {processed_data_filtered.shape}")
# Step 5: Dump processed data (after proc_list)
processed_output_path = output_dir / f"processed_data_{since_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.pkl"
if verbose:
print(f"\nStep 5: Dumping processed data to {processed_output_path}...")
with open(processed_output_path, "wb") as f:
pkl.dump(processed_data_filtered, f)
if verbose:
print(f" Saved: {processed_output_path}")
print()
print("=" * 60)
print("Summary:")
print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
print(f" Processed data: {processed_data_filtered.shape} -> {processed_output_path}")
print("=" * 60)
return raw_df, processed_data_filtered
except Exception as e:
if verbose:
print(f"\nERROR applying proc_list: {e}")
print("Only raw data was saved. The proc_list may have compatibility issues")
print("with this data (e.g., NaN values in columns that need int8 conversion).")
print()
print("=" * 60)
print("Summary:")
print(f" Raw data: {raw_df.shape} -> {raw_output_path}")
print(f" Processed data: FAILED ({type(e).__name__})")
print("=" * 60)
return raw_df, None
# Convenience function for quick testing
if __name__ == "__main__":
# Test loading data
test_since = "2019-01-01"
test_end = "2019-01-31"
print(f"Testing data loader with date range: {test_since} to {test_end}")
print()
try:
df = load_data_from_handler(
since_date=test_since,
end_date=test_end,
buffer_days=20,
verbose=True
)
print(f"\nSuccess! Loaded {len(df)} rows")
print(f"Date range in data: {df.index.get_level_values('datetime').min()} to {df.index.get_level_values('datetime').max()}")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
Loading…
Cancel
Save