From 49c9dae181a487edfd50b120d7d8e6b32ba39588 Mon Sep 17 00:00:00 2001
From: guofu
Date: Sat, 14 Feb 2026 11:07:21 +0800
Subject: [PATCH] Add CTA 1D Parquet loader and data requirements
- Add CTA1DLoaderParquet class for loading from Parquet files (parallel to CTA1DLoader)
- Add data_ops_research/ requirements for CTA Parquet dataset export
- Fix broken HFFACTOR_COLS import by defining inline
- Update exports in __init__.py files
New files:
- cta_1d/src/loader_parquet.py: Parquet-based loader
- cta_1d/config_parquet.yaml: Parquet-specific config
- data_ops_research/cta_1d/requirements.yaml: Data export requirements
- data_ops_research/cta_1d/README.md: Dataset documentation
Co-Authored-By: Claude Sonnet 4.5
---
cta_1d/__init__.py | 3 +
cta_1d/config_parquet.yaml | 41 ++
cta_1d/src/__init__.py | 3 +
cta_1d/src/loader.py | 68 ++--
cta_1d/src/loader_parquet.py | 427 +++++++++++++++++++++
data_ops_research/cta_1d/README.md | 44 +++
data_ops_research/cta_1d/requirements.yaml | 88 +++++
7 files changed, 644 insertions(+), 30 deletions(-)
create mode 100644 cta_1d/config_parquet.yaml
create mode 100644 cta_1d/src/loader_parquet.py
create mode 100644 data_ops_research/cta_1d/README.md
create mode 100644 data_ops_research/cta_1d/requirements.yaml
diff --git a/cta_1d/__init__.py b/cta_1d/__init__.py
index f7c6774..c3975e5 100644
--- a/cta_1d/__init__.py
+++ b/cta_1d/__init__.py
@@ -47,6 +47,7 @@ Backtesting:
# Re-export all public APIs from src submodules
from .src import (
CTA1DLoader,
+ CTA1DLoaderParquet,
get_blend_weights,
describe_blend_config,
BLEND_CONFIGS,
@@ -57,6 +58,7 @@ try:
from .src import run_backtest, BacktestConfig
__all__ = [
'CTA1DLoader',
+ 'CTA1DLoaderParquet',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
@@ -69,6 +71,7 @@ except ImportError:
# xgboost or sklearn not installed
__all__ = [
'CTA1DLoader',
+ 'CTA1DLoaderParquet',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
diff --git a/cta_1d/config_parquet.yaml b/cta_1d/config_parquet.yaml
new file mode 100644
index 0000000..1126d8b
--- /dev/null
+++ b/cta_1d/config_parquet.yaml
@@ -0,0 +1,41 @@
+# CTA 1D Configuration for Parquet Loading
+# Use this instead of config.yaml when using CTA1DLoaderParquet
+
+data:
+ dt_range: ['2020-01-01', '2023-12-31']
+
+ # Parquet paths (populated by data_ops_research)
+ feature_path: /data/parquet/dataset/cta_alpha158_1d
+ hffactor_path: /data/parquet/dataset/cta_hffactor_1d
+ dom_path: /data/parquet/dataset/cta_dom_1d
+ label_path: /data/parquet/dataset/cta_labels_1d
+
+ feature_sets:
+ - alpha158
+ - hffactor
+ normalization: dual
+ blend_weights: equal
+
+segments:
+ train: ['2020-01-01', '2022-06-30']
+ valid: ['2022-07-01', '2022-12-31']
+ test: ['2023-01-01', '2023-12-31']
+
+model:
+ type: xgb
+ params:
+ objective: reg:squarederror
+ eval_metric: rmse
+ eta: 0.05
+ max_depth: 6
+ subsample: 0.8
+ colsample_bytree: 0.8
+ seed: 42
+ num_boost_round: 500
+ early_stopping_rounds: 50
+
+training:
+ return_type: o2c_twap1min
+ weight_factors:
+ positive: 1.0
+ negative: 2.0
diff --git a/cta_1d/src/__init__.py b/cta_1d/src/__init__.py
index 3aa7baf..8679965 100644
--- a/cta_1d/src/__init__.py
+++ b/cta_1d/src/__init__.py
@@ -1,6 +1,7 @@
"""CTA 1-day task-specific utilities."""
from .loader import CTA1DLoader
+from .loader_parquet import CTA1DLoaderParquet
from .labels import get_blend_weights, describe_blend_config, BLEND_CONFIGS
try:
@@ -8,6 +9,7 @@ try:
from .backtest import run_backtest, BacktestConfig
__all__ = [
'CTA1DLoader',
+ 'CTA1DLoaderParquet',
'train_model',
'TrainConfig',
'run_backtest',
@@ -20,6 +22,7 @@ except ImportError:
# xgboost or sklearn not installed
__all__ = [
'CTA1DLoader',
+ 'CTA1DLoaderParquet',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
diff --git a/cta_1d/src/loader.py b/cta_1d/src/loader.py
index 2c0dc46..6b3f3ae 100644
--- a/cta_1d/src/loader.py
+++ b/cta_1d/src/loader.py
@@ -15,10 +15,21 @@ import polars as pl
from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore
from qshare.data.universal import DataSpec
from qshare.io.ddb import get_ddb_sess, reset_index_from_ddb
-from qshare.config.research.cta.features import HFFACTOR_COLS
from .labels import get_blend_weights
+# HFFACTOR columns (defined inline - qshare.config.research not available)
+HFFACTOR_COLS = [
+ 'vol_1min',
+ 'skew_1min',
+ 'volp_1min',
+ 'volp_ratio_1min',
+ 'voln_ratio_1min',
+ 'trend_strength_1min',
+ 'pv_corr_1min',
+ 'flowin_ratio_1min',
+]
+
@dataclass
class CTA1DLoader:
@@ -97,9 +108,13 @@ class CTA1DLoader:
df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner')
# Filter to requested date range
+ # Convert string dates to date objects for proper comparison
+ from datetime import datetime
+ start_dt = datetime.strptime(start_date, '%Y-%m-%d').date()
+ end_dt = datetime.strptime(end_date, '%Y-%m-%d').date()
df = df.filter(
- (pl.col('datetime') >= start_date) &
- (pl.col('datetime') <= end_date)
+ (pl.col('datetime') >= start_dt) &
+ (pl.col('datetime') <= end_dt)
)
# Calculate weights
@@ -175,12 +190,11 @@ class CTA1DLoader:
since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d')
# Load from factor table
- df = sess.run(f"""
- select code, m_nDate, factor_name, value
- from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor')
- where m_nDate >= {since_ddb}
- and factor_name in [{','.join([f"'{c}'" for c in HFFACTOR_COLS])}]
- """)
+ factor_list = ','.join([f"'{c}'" for c in HFFACTOR_COLS])
+ query = f"""select code, m_nDate, factor_name, value
+from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor')
+where m_nDate >= {since_ddb} and factor_name in [{factor_list}]"""
+ df = sess.run(query)
# Pivot to wide format
df = df.pivot_table(
@@ -263,20 +277,16 @@ class CTA1DLoader:
"""Apply specified normalization to label."""
fit_start, fit_end = fit_range
- # Ensure datetime column is string for comparison
- if pl_df['datetime'].dtype == pl.Date:
- pl_df = pl_df.with_columns(
- pl.col('datetime').dt.strftime('%Y-%m-%d').alias('datetime_str')
- )
- date_col = 'datetime_str'
- else:
- date_col = 'datetime'
+ # Convert fit_range strings to date objects for comparison
+ from datetime import datetime
+ fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
+ fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
if self.normalization == 'zscore':
# Calculate mean/std on fit range
fit_data = pl_df.filter(
- (pl.col(date_col) >= fit_start) &
- (pl.col(date_col) <= fit_end)
+ (pl.col('datetime') >= fit_start_date) &
+ (pl.col('datetime') <= fit_end_date)
)
mean = fit_data['ret'].mean()
std = fit_data['ret'].std()
@@ -334,17 +344,15 @@ class CTA1DLoader:
"""Create z-score normalized label."""
fit_start, fit_end = fit_range
- # Handle date type conversion for comparison
- if pl_df['datetime'].dtype == pl.Date:
- fit_data = pl_df.filter(
- (pl.col('datetime').dt.strftime('%Y-%m-%d') >= fit_start) &
- (pl.col('datetime').dt.strftime('%Y-%m-%d') <= fit_end)
- )
- else:
- fit_data = pl_df.filter(
- (pl.col('datetime') >= fit_start) &
- (pl.col('datetime') <= fit_end)
- )
+ # Convert fit_range strings to date objects for comparison
+ from datetime import datetime
+ fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
+ fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
+
+ fit_data = pl_df.filter(
+ (pl.col('datetime') >= fit_start_date) &
+ (pl.col('datetime') <= fit_end_date)
+ )
mean = fit_data['ret'].mean()
std = fit_data['ret'].std()
diff --git a/cta_1d/src/loader_parquet.py b/cta_1d/src/loader_parquet.py
new file mode 100644
index 0000000..6f927dc
--- /dev/null
+++ b/cta_1d/src/loader_parquet.py
@@ -0,0 +1,427 @@
+"""CTA 1D loader using Parquet/Polars (parallel to CTA1DLoader)."""
+
+from dataclasses import dataclass
+from datetime import date, datetime
+from typing import List, Optional
+
+import numpy as np
+import pandas as pd
+import polars as pl
+
+from qshare.data import pl_Dataset
+from qshare.io.polars import load_from_pq
+
+from .labels import get_blend_weights
+
+# HFFACTOR columns (defined inline to avoid qshare.config.research dependency)
+HFFACTOR_COLS = [
+ 'vol_1min',
+ 'skew_1min',
+ 'volp_1min',
+ 'volp_ratio_1min',
+ 'voln_ratio_1min',
+ 'trend_strength_1min',
+ 'pv_corr_1min',
+ 'flowin_ratio_1min',
+]
+
+
+@dataclass
+class CTA1DLoaderParquet:
+ """CTA 1D loader using Parquet files instead of DolphinDB.
+
+ This is a parallel implementation to CTA1DLoader that reads from
+ pre-exported Parquet tables instead of querying DolphinDB directly.
+
+ Example:
+ >>> loader = CTA1DLoaderParquet(
+ ... return_type='o2c_twap1min',
+ ... normalization='dual',
+ ... feature_sets=['alpha158', 'hffactor']
+ ... )
+ >>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31'])
+ >>> dataset = dataset.with_segments({
+ ... 'train': ('2020-01-01', '2022-12-31'),
+ ... 'test': ('2023-01-01', '2023-12-31')
+ ... })
+ >>> X_train, y_train, w_train = dataset.split('train').to_numpy()
+ """
+
+ return_type: str = 'o2c_twap1min'
+ normalization: str = 'dual'
+ feature_sets: List[str] = None
+ weight_factors: dict = None
+ blend_weights: str | List[float] | None = None
+
+ # Parquet paths (populated by data_ops_research)
+ feature_path: str = '/data/parquet/dataset/cta_alpha158_1d'
+ hffactor_path: str = '/data/parquet/dataset/cta_hffactor_1d'
+ dom_path: str = '/data/parquet/dataset/cta_dom_1d'
+ label_path: str = '/data/parquet/dataset/cta_labels_1d'
+
+ label_cap_upper: float = 0.5
+ label_cap_lower: float = -0.5
+
+ def __post_init__(self):
+ if self.feature_sets is None:
+ self.feature_sets = ['alpha158', 'hffactor']
+ if self.weight_factors is None:
+ self.weight_factors = {'positive': 1.0, 'negative': 2.0}
+
+ def load(
+ self,
+ dt_range: List[str],
+ fit_range: Optional[List[str]] = None
+ ) -> pl_Dataset:
+ """Load and prepare CTA 1-day training dataset from Parquet files.
+
+ Args:
+ dt_range: Date range [start_date, end_date] for dataset
+ fit_range: Date range [start, end] for fitting normalization params.
+ If None, uses first 60% of dt_range.
+
+ Returns:
+ pl_Dataset with features, label, and weight columns
+ """
+ start_date, end_date = dt_range
+
+ if fit_range is None:
+ # Default: use first 60% for fit
+ all_dates = pd.date_range(start_date, end_date)
+ split_idx = int(len(all_dates) * 0.6)
+ fit_range = [
+ all_dates[0].strftime('%Y-%m-%d'),
+ all_dates[split_idx].strftime('%Y-%m-%d')
+ ]
+
+ # Load extended history for rolling normalization
+ load_start = (pd.to_datetime(start_date) - pd.Timedelta(days=120)).strftime('%Y-%m-%d')
+
+ # Load features
+ df_features = self._load_features(load_start, end_date)
+
+ # Load and normalize labels
+ df_label = self._load_labels(load_start, end_date, fit_range)
+
+ # Combine
+ df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner')
+
+ # Filter to requested date range
+ start_dt = datetime.strptime(start_date, '%Y-%m-%d').date()
+ end_dt = datetime.strptime(end_date, '%Y-%m-%d').date()
+ df = df.filter(
+ (pl.col('datetime') >= start_dt) &
+ (pl.col('datetime') <= end_dt)
+ )
+
+ # Calculate weights
+ df = self._calculate_weights(df)
+
+ # Clean data
+ df = self._clean_data(df)
+
+ # Get feature columns
+ feature_cols = [c for c in df.columns
+ if any(c.startswith(prefix) for prefix in ['f_a158_', 'f_hf_'])]
+
+ return pl_Dataset(
+ data=df,
+ features=feature_cols,
+ label='label',
+ weight='weight' if self.weight_factors else None
+ )
+
+ def _load_features(self, start_date: str, end_date: str) -> pl.DataFrame:
+ """Load feature data from Parquet files."""
+ feature_dfs = []
+
+ if 'alpha158' in self.feature_sets:
+ df_alpha = self._load_alpha158(start_date, end_date)
+ feature_dfs.append(df_alpha)
+
+ if 'hffactor' in self.feature_sets:
+ df_hf = self._load_hffactor(start_date, end_date)
+ feature_dfs.append(df_hf)
+
+ # Join all feature sets
+ result = feature_dfs[0]
+ for df in feature_dfs[1:]:
+ result = result.join(df, on=['datetime', 'instrument'], how='inner')
+
+ return result
+
+ def _load_alpha158(self, start_date: str, end_date: str) -> pl.DataFrame:
+ """Load alpha158 features from Parquet."""
+ # Load using qshare's parquet loader
+ df = load_from_pq(
+ self.feature_path,
+ start_date=start_date,
+ end_date=end_date
+ )
+
+ # Drop non-numeric columns if present
+ if 'code_init' in df.columns:
+ df = df.drop('code_init')
+
+ # Rename columns with prefix
+ rename_map = {
+ c: f'f_a158_{c}'
+ for c in df.columns
+ if c not in ['datetime', 'instrument']
+ }
+ df = df.rename(rename_map)
+
+ return df
+
+ def _load_hffactor(self, start_date: str, end_date: str) -> pl.DataFrame:
+ """Load hffactor features from Parquet (already pivoted)."""
+ # Load using qshare's parquet loader
+ df = load_from_pq(
+ self.hffactor_path,
+ start_date=start_date,
+ end_date=end_date
+ )
+
+ # Rename columns with prefix
+ rename_map = {
+ c: f'f_hf_{c}'
+ for c in df.columns
+ if c not in ['datetime', 'instrument']
+ }
+ df = df.rename(rename_map)
+
+ return df
+
+ def _load_labels(
+ self,
+ start_date: str,
+ end_date: str,
+ fit_range: List[str]
+ ) -> pl.DataFrame:
+ """Load and normalize labels from Parquet files."""
+ # Map return type to indicator name
+ indicator_map = {
+ 'o2c_twap1min': 'twap_open1m@1_twap_close1m@1',
+ 'o2o_twap1min': 'twap_open1m@1_twap_open1m@2',
+ }
+ indicator = indicator_map.get(self.return_type, self.return_type)
+
+ # Load dominant contract mapping from Parquet
+ df_contract = load_from_pq(
+ self.dom_path,
+ start_date=start_date,
+ end_date=end_date
+ )
+
+ # Load returns from Parquet
+ df_return = load_from_pq(
+ self.label_path,
+ start_date=start_date,
+ end_date=end_date
+ )
+
+ # Filter for the specific indicator
+ df_return = df_return.filter(pl.col('indicator') == indicator)
+
+ # Merge with dominant contract mapping
+ df_return = df_return.join(
+ df_contract,
+ on=['datetime', 'instrument'],
+ how='inner'
+ )
+
+ # Use code_init as the instrument (continuous contract)
+ df_return = df_return.with_columns(
+ (pl.col('code_init') + 'Ind').alias('instrument')
+ )
+
+ # Select and rename return column
+ df_return = df_return.select([
+ 'datetime',
+ 'instrument',
+ pl.col('value').alias('ret')
+ ])
+
+ # Apply normalization
+ df_return = self._normalize_label(df_return, fit_range)
+
+ return df_return
+
+ def _normalize_label(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame:
+ """Apply specified normalization to label."""
+ fit_start, fit_end = fit_range
+
+ # Convert fit_range strings to date objects for comparison
+ fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
+ fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
+
+ if self.normalization == 'zscore':
+ # Calculate mean/std on fit range
+ fit_data = pl_df.filter(
+ (pl.col('datetime') >= fit_start_date) &
+ (pl.col('datetime') <= fit_end_date)
+ )
+ mean = fit_data['ret'].mean()
+ std = fit_data['ret'].std()
+
+ result = pl_df.with_columns(
+ ((pl.col('ret') - mean) / std).clip(
+ self.label_cap_lower, self.label_cap_upper
+ ).alias('label')
+ ).select(['datetime', 'instrument', 'label'])
+ return result
+
+ elif self.normalization == 'cs_zscore':
+ # Cross-sectional z-score per datetime
+ return pl_df.with_columns(
+ ((pl.col('ret') - pl.col('ret').mean().over('datetime')) /
+ pl.col('ret').std().over('datetime')).clip(
+ self.label_cap_lower, self.label_cap_upper
+ ).alias('label')
+ ).select(['datetime', 'instrument', 'label'])
+
+ elif self.normalization == 'rolling_20':
+ return self._apply_rolling_norm(pl_df, window=20, fit_range=fit_range)
+
+ elif self.normalization == 'rolling_60':
+ return self._apply_rolling_norm(pl_df, window=60, fit_range=fit_range)
+
+ elif self.normalization == 'dual':
+ # Create all normalization variants
+ label_zscore = self._normalize_zscore(pl_df, fit_range)
+ label_cszscore = self._normalize_cs_zscore(pl_df)
+ label_roll20 = self._normalize_rolling(pl_df, window=20, fit_range=fit_range)
+ label_roll60 = self._normalize_rolling(pl_df, window=60, fit_range=fit_range)
+
+ # Get blend weights
+ weights = get_blend_weights(self.blend_weights)
+
+ # Join and blend
+ pl_df = label_zscore.join(label_cszscore, on=['datetime', 'instrument'])
+ pl_df = pl_df.join(label_roll20, on=['datetime', 'instrument'])
+ pl_df = pl_df.join(label_roll60, on=['datetime', 'instrument'])
+
+ return pl_df.with_columns(
+ (weights[0] * pl.col('label_zscore') +
+ weights[1] * pl.col('label_cszscore') +
+ weights[2] * pl.col('label_roll20') +
+ weights[3] * pl.col('label_roll60')).clip(
+ self.label_cap_lower, self.label_cap_upper
+ ).alias('label')
+ ).select(['datetime', 'instrument', 'label'])
+
+ else:
+ raise ValueError(f"Unknown normalization: {self.normalization}")
+
+ def _normalize_zscore(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame:
+ """Create z-score normalized label."""
+ fit_start, fit_end = fit_range
+
+ fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
+ fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
+
+ fit_data = pl_df.filter(
+ (pl.col('datetime') >= fit_start_date) &
+ (pl.col('datetime') <= fit_end_date)
+ )
+
+ mean = fit_data['ret'].mean()
+ std = fit_data['ret'].std()
+
+ return pl_df.with_columns(
+ ((pl.col('ret') - mean) / std).alias('label_zscore')
+ ).select(['datetime', 'instrument', 'label_zscore'])
+
+ def _normalize_cs_zscore(self, pl_df: pl.DataFrame) -> pl.DataFrame:
+ """Create cross-sectional z-score normalized label."""
+ return pl_df.with_columns(
+ ((pl.col('ret') - pl.col('ret').mean().over('datetime')) /
+ pl.col('ret').std().over('datetime')).alias('label_cszscore')
+ ).select(['datetime', 'instrument', 'label_cszscore'])
+
+ def _normalize_rolling(
+ self,
+ pl_df: pl.DataFrame,
+ window: int,
+ fit_range: List[str]
+ ) -> pl.DataFrame:
+ """Create rolling window normalized label."""
+ # Convert to pandas for rolling calculation
+ pd_df = pl_df.to_pandas().set_index(['datetime', 'instrument'])
+
+ # Unstack to wide format
+ df_wide = pd_df['ret'].unstack('instrument')
+
+ # Calculate rolling mean and std
+ rolling_mean = df_wide.rolling(window=window, min_periods=window//2).mean()
+ rolling_std = df_wide.rolling(window=window, min_periods=window//2).std()
+
+ # Normalize
+ df_normalized = (df_wide - rolling_mean) / rolling_std
+
+ # Restack
+ rolling_label = df_normalized.stack().reset_index()
+ rolling_label.columns = ['datetime', 'instrument', f'label_roll{window}']
+
+ return pl.from_pandas(rolling_label)
+
+ def _apply_rolling_norm(
+ self,
+ pl_df: pl.DataFrame,
+ window: int,
+ fit_range: List[str]
+ ) -> pl.DataFrame:
+ """Apply rolling normalization and cap."""
+ result = self._normalize_rolling(pl_df, window, fit_range)
+ return result.with_columns(
+ pl.col(f'label_roll{window}').clip(
+ self.label_cap_lower, self.label_cap_upper
+ ).alias('label')
+ ).select(['datetime', 'instrument', 'label'])
+
+ def _calculate_weights(self, pl_df: pl.DataFrame) -> pl.DataFrame:
+ """Calculate sample weights based on return magnitude."""
+ # Base weights by return magnitude tiers
+ pl_df = pl_df.with_columns(
+ pl.when(pl.col('label').abs() > 1.5).then(pl.lit(2.5))
+ .when(pl.col('label').abs() > 1.0).then(pl.lit(2.0))
+ .when(pl.col('label').abs() > 0.5).then(pl.lit(1.5))
+ .when(pl.col('label').abs() > 0.2).then(pl.lit(1.0))
+ .otherwise(0.0).alias('weight')
+ )
+
+ # Apply negative return multiplier
+ if self.weight_factors.get('negative'):
+ pl_df = pl_df.with_columns(
+ pl.when(pl.col('label') < -0.5)
+ .then(pl.col('weight') * self.weight_factors['negative'])
+ .otherwise(pl.col('weight'))
+ .alias('weight')
+ )
+
+ # Apply positive return multiplier
+ if self.weight_factors.get('positive'):
+ pl_df = pl_df.with_columns(
+ pl.when(pl.col('label') > 0.5)
+ .then(pl.col('weight') * self.weight_factors['positive'])
+ .otherwise(pl.col('weight'))
+ .alias('weight')
+ )
+
+ return pl_df
+
+ def _clean_data(self, pl_df: pl.DataFrame) -> pl.DataFrame:
+ """Clean data: remove inf/nan values."""
+ # Get numeric columns only
+ numeric_cols = [
+ c for c in pl_df.columns
+ if pl_df[c].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64]
+ ]
+
+ # Replace inf with null, then drop nulls
+ pl_df = pl_df.with_columns([
+ pl.when(pl.col(c).is_infinite()).then(None).otherwise(pl.col(c)).alias(c)
+ for c in numeric_cols
+ ])
+
+ return pl_df.drop_nulls()
diff --git a/data_ops_research/cta_1d/README.md b/data_ops_research/cta_1d/README.md
new file mode 100644
index 0000000..613ae80
--- /dev/null
+++ b/data_ops_research/cta_1d/README.md
@@ -0,0 +1,44 @@
+# CTA 1D Parquet Dataset
+
+This directory contains requirements for CTA (Commodity Trading Advisor) futures
+Parquet datasets used by alpha_lab.
+
+## Tables
+
+### cta_alpha158_1d
+Alpha158 features for CTA futures.
+- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_alpha159_0_7_beta`
+- **Output**: `/data/parquet/dataset/cta_alpha158_1d/`
+- **Columns**: ~163 feature columns + code, m_nDate
+
+### cta_hffactor_1d
+High-frequency factor features (8 columns).
+- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_hffactor`
+- **Output**: `/data/parquet/dataset/cta_hffactor_1d/`
+- **Transformation**: Pivot from long to wide format
+ - Input columns: code, m_nDate, factor_name, value
+ - Output columns: code, m_nDate, vol_1min, skew_1min, ... (8 features)
+- **Filter**: Only include factor_name in [vol_1min, skew_1min, volp_1min,
+ volp_ratio_1min, voln_ratio_1min, trend_strength_1min, pv_corr_1min,
+ flowin_ratio_1min]
+
+### cta_dom_1d
+Dominant contract mapping for continuous contracts.
+- **Source**: `dfs://daily_stock_run.dwm_1day_cta_dom`
+- **Output**: `/data/parquet/dataset/cta_dom_1d/`
+- **Filter**: version = 'vp_csmax_roll2_cummax'
+- **Aggregation**: GROUP BY m_nDate, code_init; SELECT first(code) as code
+
+### cta_labels_1d
+Return labels for different return types.
+- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_hfvalue`
+- **Output**: `/data/parquet/dataset/cta_labels_1d/`
+- **Filter**: indicator in [twap_open1m@1_twap_close1m@1, twap_open1m@1_twap_open1m@2]
+- **Columns**: code, m_nDate, indicator, value
+
+## Consumer
+
+Used by: `alpha_lab/cta_1d/src/loader_parquet.py`
+
+The alpha_lab project will create a parallel loader that reads from these
+Parquet tables instead of DolphinDB.
diff --git a/data_ops_research/cta_1d/requirements.yaml b/data_ops_research/cta_1d/requirements.yaml
new file mode 100644
index 0000000..b751b05
--- /dev/null
+++ b/data_ops_research/cta_1d/requirements.yaml
@@ -0,0 +1,88 @@
+# CTA 1D Parquet Dataset Requirements
+# This file specifies the required Parquet tables for alpha_lab CTA 1D task
+
+# Table 1: Alpha158 Features
+cta_alpha158_1d:
+ source:
+ database: dfs://daily_stock_run
+ table: stg_1day_tinysoft_cta_alpha159_0_7_beta
+ host: 192.168.1.146
+ port: 8848
+ target:
+ path: cta_alpha158_1d/
+ partition_freq: 1D
+ col_datetime: m_nDate
+ code_format: tscode
+ description: Alpha158 features for CTA futures (~163 columns)
+ priority: medium
+
+# Table 2: HFFactor Features (requires pivot)
+cta_hffactor_1d:
+ source:
+ database: dfs://daily_stock_run
+ table: stg_1day_tinysoft_cta_hffactor
+ host: 192.168.1.146
+ port: 8848
+ # Long format: code, m_nDate, factor_name, value
+ # Pivot to wide format during export
+ pivot:
+ index: [code, m_nDate]
+ columns: factor_name
+ values: value
+ filter: # Only these 8 columns needed
+ - vol_1min
+ - skew_1min
+ - volp_1min
+ - volp_ratio_1min
+ - voln_ratio_1min
+ - trend_strength_1min
+ - pv_corr_1min
+ - flowin_ratio_1min
+ target:
+ path: cta_hffactor_1d/
+ partition_freq: 1D
+ col_datetime: m_nDate
+ code_format: tscode
+ description: High-frequency factor features (8 columns, pivoted from long format)
+ priority: medium
+ notes: Requires pivot transformation from long to wide format
+
+# Table 3: Dominant Contract Mapping
+cta_dom_1d:
+ source:
+ database: dfs://daily_stock_run
+ table: dwm_1day_cta_dom
+ host: 192.168.1.146
+ port: 8848
+ # Group and aggregate during export
+ group_by: [m_nDate, code_init]
+ filter: "version='vp_csmax_roll2_cummax'"
+ agg: "first(code) as code"
+ target:
+ path: cta_dom_1d/
+ partition_freq: 1D
+ col_datetime: m_nDate
+ code_format: tscode
+ description: Dominant contract mapping for continuous contracts
+ priority: medium
+ notes: Requires group_by + aggregation, filter by version
+
+# Table 4: Return Labels
+cta_labels_1d:
+ source:
+ database: dfs://daily_stock_run
+ table: stg_1day_tinysoft_cta_hfvalue
+ host: 192.168.1.146
+ port: 8848
+ # Filter for specific indicators
+ indicators:
+ - twap_open1m@1_twap_close1m@1 # o2c_twap1min
+ - twap_open1m@1_twap_open1m@2 # o2o_twap1min
+ target:
+ path: cta_labels_1d/
+ partition_freq: 1D
+ col_datetime: m_nDate
+ code_format: tscode
+ description: Return labels for different return types
+ priority: medium
+ notes: Filter indicator column for specific return types