Add CTA 1D Parquet loader and data requirements

- Add CTA1DLoaderParquet class for loading from Parquet files (parallel to CTA1DLoader)
- Add data_ops_research/ requirements for CTA Parquet dataset export
- Fix broken HFFACTOR_COLS import by defining inline
- Update exports in __init__.py files

New files:
- cta_1d/src/loader_parquet.py: Parquet-based loader
- cta_1d/config_parquet.yaml: Parquet-specific config
- data_ops_research/cta_1d/requirements.yaml: Data export requirements
- data_ops_research/cta_1d/README.md: Dataset documentation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
master
guofu 2 weeks ago
parent 19f7c522e4
commit 49c9dae181

@ -47,6 +47,7 @@ Backtesting:
# Re-export all public APIs from src submodules # Re-export all public APIs from src submodules
from .src import ( from .src import (
CTA1DLoader, CTA1DLoader,
CTA1DLoaderParquet,
get_blend_weights, get_blend_weights,
describe_blend_config, describe_blend_config,
BLEND_CONFIGS, BLEND_CONFIGS,
@ -57,6 +58,7 @@ try:
from .src import run_backtest, BacktestConfig from .src import run_backtest, BacktestConfig
__all__ = [ __all__ = [
'CTA1DLoader', 'CTA1DLoader',
'CTA1DLoaderParquet',
'get_blend_weights', 'get_blend_weights',
'describe_blend_config', 'describe_blend_config',
'BLEND_CONFIGS', 'BLEND_CONFIGS',
@ -69,6 +71,7 @@ except ImportError:
# xgboost or sklearn not installed # xgboost or sklearn not installed
__all__ = [ __all__ = [
'CTA1DLoader', 'CTA1DLoader',
'CTA1DLoaderParquet',
'get_blend_weights', 'get_blend_weights',
'describe_blend_config', 'describe_blend_config',
'BLEND_CONFIGS', 'BLEND_CONFIGS',

@ -0,0 +1,41 @@
# CTA 1D Configuration for Parquet Loading
# Use this instead of config.yaml when using CTA1DLoaderParquet
data:
dt_range: ['2020-01-01', '2023-12-31']
# Parquet paths (populated by data_ops_research)
feature_path: /data/parquet/dataset/cta_alpha158_1d
hffactor_path: /data/parquet/dataset/cta_hffactor_1d
dom_path: /data/parquet/dataset/cta_dom_1d
label_path: /data/parquet/dataset/cta_labels_1d
feature_sets:
- alpha158
- hffactor
normalization: dual
blend_weights: equal
segments:
train: ['2020-01-01', '2022-06-30']
valid: ['2022-07-01', '2022-12-31']
test: ['2023-01-01', '2023-12-31']
model:
type: xgb
params:
objective: reg:squarederror
eval_metric: rmse
eta: 0.05
max_depth: 6
subsample: 0.8
colsample_bytree: 0.8
seed: 42
num_boost_round: 500
early_stopping_rounds: 50
training:
return_type: o2c_twap1min
weight_factors:
positive: 1.0
negative: 2.0

@ -1,6 +1,7 @@
"""CTA 1-day task-specific utilities.""" """CTA 1-day task-specific utilities."""
from .loader import CTA1DLoader from .loader import CTA1DLoader
from .loader_parquet import CTA1DLoaderParquet
from .labels import get_blend_weights, describe_blend_config, BLEND_CONFIGS from .labels import get_blend_weights, describe_blend_config, BLEND_CONFIGS
try: try:
@ -8,6 +9,7 @@ try:
from .backtest import run_backtest, BacktestConfig from .backtest import run_backtest, BacktestConfig
__all__ = [ __all__ = [
'CTA1DLoader', 'CTA1DLoader',
'CTA1DLoaderParquet',
'train_model', 'train_model',
'TrainConfig', 'TrainConfig',
'run_backtest', 'run_backtest',
@ -20,6 +22,7 @@ except ImportError:
# xgboost or sklearn not installed # xgboost or sklearn not installed
__all__ = [ __all__ = [
'CTA1DLoader', 'CTA1DLoader',
'CTA1DLoaderParquet',
'get_blend_weights', 'get_blend_weights',
'describe_blend_config', 'describe_blend_config',
'BLEND_CONFIGS', 'BLEND_CONFIGS',

@ -15,10 +15,21 @@ import polars as pl
from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore
from qshare.data.universal import DataSpec from qshare.data.universal import DataSpec
from qshare.io.ddb import get_ddb_sess, reset_index_from_ddb from qshare.io.ddb import get_ddb_sess, reset_index_from_ddb
from qshare.config.research.cta.features import HFFACTOR_COLS
from .labels import get_blend_weights from .labels import get_blend_weights
# HFFACTOR columns (defined inline - qshare.config.research not available)
HFFACTOR_COLS = [
'vol_1min',
'skew_1min',
'volp_1min',
'volp_ratio_1min',
'voln_ratio_1min',
'trend_strength_1min',
'pv_corr_1min',
'flowin_ratio_1min',
]
@dataclass @dataclass
class CTA1DLoader: class CTA1DLoader:
@ -97,9 +108,13 @@ class CTA1DLoader:
df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner') df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner')
# Filter to requested date range # Filter to requested date range
# Convert string dates to date objects for proper comparison
from datetime import datetime
start_dt = datetime.strptime(start_date, '%Y-%m-%d').date()
end_dt = datetime.strptime(end_date, '%Y-%m-%d').date()
df = df.filter( df = df.filter(
(pl.col('datetime') >= start_date) & (pl.col('datetime') >= start_dt) &
(pl.col('datetime') <= end_date) (pl.col('datetime') <= end_dt)
) )
# Calculate weights # Calculate weights
@ -175,12 +190,11 @@ class CTA1DLoader:
since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d') since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d')
# Load from factor table # Load from factor table
df = sess.run(f""" factor_list = ','.join([f"'{c}'" for c in HFFACTOR_COLS])
select code, m_nDate, factor_name, value query = f"""select code, m_nDate, factor_name, value
from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor') from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor')
where m_nDate >= {since_ddb} where m_nDate >= {since_ddb} and factor_name in [{factor_list}]"""
and factor_name in [{','.join([f"'{c}'" for c in HFFACTOR_COLS])}] df = sess.run(query)
""")
# Pivot to wide format # Pivot to wide format
df = df.pivot_table( df = df.pivot_table(
@ -263,20 +277,16 @@ class CTA1DLoader:
"""Apply specified normalization to label.""" """Apply specified normalization to label."""
fit_start, fit_end = fit_range fit_start, fit_end = fit_range
# Ensure datetime column is string for comparison # Convert fit_range strings to date objects for comparison
if pl_df['datetime'].dtype == pl.Date: from datetime import datetime
pl_df = pl_df.with_columns( fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
pl.col('datetime').dt.strftime('%Y-%m-%d').alias('datetime_str') fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
)
date_col = 'datetime_str'
else:
date_col = 'datetime'
if self.normalization == 'zscore': if self.normalization == 'zscore':
# Calculate mean/std on fit range # Calculate mean/std on fit range
fit_data = pl_df.filter( fit_data = pl_df.filter(
(pl.col(date_col) >= fit_start) & (pl.col('datetime') >= fit_start_date) &
(pl.col(date_col) <= fit_end) (pl.col('datetime') <= fit_end_date)
) )
mean = fit_data['ret'].mean() mean = fit_data['ret'].mean()
std = fit_data['ret'].std() std = fit_data['ret'].std()
@ -334,17 +344,15 @@ class CTA1DLoader:
"""Create z-score normalized label.""" """Create z-score normalized label."""
fit_start, fit_end = fit_range fit_start, fit_end = fit_range
# Handle date type conversion for comparison # Convert fit_range strings to date objects for comparison
if pl_df['datetime'].dtype == pl.Date: from datetime import datetime
fit_data = pl_df.filter( fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
(pl.col('datetime').dt.strftime('%Y-%m-%d') >= fit_start) & fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
(pl.col('datetime').dt.strftime('%Y-%m-%d') <= fit_end)
) fit_data = pl_df.filter(
else: (pl.col('datetime') >= fit_start_date) &
fit_data = pl_df.filter( (pl.col('datetime') <= fit_end_date)
(pl.col('datetime') >= fit_start) & )
(pl.col('datetime') <= fit_end)
)
mean = fit_data['ret'].mean() mean = fit_data['ret'].mean()
std = fit_data['ret'].std() std = fit_data['ret'].std()

@ -0,0 +1,427 @@
"""CTA 1D loader using Parquet/Polars (parallel to CTA1DLoader)."""
from dataclasses import dataclass
from datetime import date, datetime
from typing import List, Optional
import numpy as np
import pandas as pd
import polars as pl
from qshare.data import pl_Dataset
from qshare.io.polars import load_from_pq
from .labels import get_blend_weights
# HFFACTOR columns (defined inline to avoid qshare.config.research dependency)
HFFACTOR_COLS = [
'vol_1min',
'skew_1min',
'volp_1min',
'volp_ratio_1min',
'voln_ratio_1min',
'trend_strength_1min',
'pv_corr_1min',
'flowin_ratio_1min',
]
@dataclass
class CTA1DLoaderParquet:
"""CTA 1D loader using Parquet files instead of DolphinDB.
This is a parallel implementation to CTA1DLoader that reads from
pre-exported Parquet tables instead of querying DolphinDB directly.
Example:
>>> loader = CTA1DLoaderParquet(
... return_type='o2c_twap1min',
... normalization='dual',
... feature_sets=['alpha158', 'hffactor']
... )
>>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31'])
>>> dataset = dataset.with_segments({
... 'train': ('2020-01-01', '2022-12-31'),
... 'test': ('2023-01-01', '2023-12-31')
... })
>>> X_train, y_train, w_train = dataset.split('train').to_numpy()
"""
return_type: str = 'o2c_twap1min'
normalization: str = 'dual'
feature_sets: List[str] = None
weight_factors: dict = None
blend_weights: str | List[float] | None = None
# Parquet paths (populated by data_ops_research)
feature_path: str = '/data/parquet/dataset/cta_alpha158_1d'
hffactor_path: str = '/data/parquet/dataset/cta_hffactor_1d'
dom_path: str = '/data/parquet/dataset/cta_dom_1d'
label_path: str = '/data/parquet/dataset/cta_labels_1d'
label_cap_upper: float = 0.5
label_cap_lower: float = -0.5
def __post_init__(self):
if self.feature_sets is None:
self.feature_sets = ['alpha158', 'hffactor']
if self.weight_factors is None:
self.weight_factors = {'positive': 1.0, 'negative': 2.0}
def load(
self,
dt_range: List[str],
fit_range: Optional[List[str]] = None
) -> pl_Dataset:
"""Load and prepare CTA 1-day training dataset from Parquet files.
Args:
dt_range: Date range [start_date, end_date] for dataset
fit_range: Date range [start, end] for fitting normalization params.
If None, uses first 60% of dt_range.
Returns:
pl_Dataset with features, label, and weight columns
"""
start_date, end_date = dt_range
if fit_range is None:
# Default: use first 60% for fit
all_dates = pd.date_range(start_date, end_date)
split_idx = int(len(all_dates) * 0.6)
fit_range = [
all_dates[0].strftime('%Y-%m-%d'),
all_dates[split_idx].strftime('%Y-%m-%d')
]
# Load extended history for rolling normalization
load_start = (pd.to_datetime(start_date) - pd.Timedelta(days=120)).strftime('%Y-%m-%d')
# Load features
df_features = self._load_features(load_start, end_date)
# Load and normalize labels
df_label = self._load_labels(load_start, end_date, fit_range)
# Combine
df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner')
# Filter to requested date range
start_dt = datetime.strptime(start_date, '%Y-%m-%d').date()
end_dt = datetime.strptime(end_date, '%Y-%m-%d').date()
df = df.filter(
(pl.col('datetime') >= start_dt) &
(pl.col('datetime') <= end_dt)
)
# Calculate weights
df = self._calculate_weights(df)
# Clean data
df = self._clean_data(df)
# Get feature columns
feature_cols = [c for c in df.columns
if any(c.startswith(prefix) for prefix in ['f_a158_', 'f_hf_'])]
return pl_Dataset(
data=df,
features=feature_cols,
label='label',
weight='weight' if self.weight_factors else None
)
def _load_features(self, start_date: str, end_date: str) -> pl.DataFrame:
"""Load feature data from Parquet files."""
feature_dfs = []
if 'alpha158' in self.feature_sets:
df_alpha = self._load_alpha158(start_date, end_date)
feature_dfs.append(df_alpha)
if 'hffactor' in self.feature_sets:
df_hf = self._load_hffactor(start_date, end_date)
feature_dfs.append(df_hf)
# Join all feature sets
result = feature_dfs[0]
for df in feature_dfs[1:]:
result = result.join(df, on=['datetime', 'instrument'], how='inner')
return result
def _load_alpha158(self, start_date: str, end_date: str) -> pl.DataFrame:
"""Load alpha158 features from Parquet."""
# Load using qshare's parquet loader
df = load_from_pq(
self.feature_path,
start_date=start_date,
end_date=end_date
)
# Drop non-numeric columns if present
if 'code_init' in df.columns:
df = df.drop('code_init')
# Rename columns with prefix
rename_map = {
c: f'f_a158_{c}'
for c in df.columns
if c not in ['datetime', 'instrument']
}
df = df.rename(rename_map)
return df
def _load_hffactor(self, start_date: str, end_date: str) -> pl.DataFrame:
"""Load hffactor features from Parquet (already pivoted)."""
# Load using qshare's parquet loader
df = load_from_pq(
self.hffactor_path,
start_date=start_date,
end_date=end_date
)
# Rename columns with prefix
rename_map = {
c: f'f_hf_{c}'
for c in df.columns
if c not in ['datetime', 'instrument']
}
df = df.rename(rename_map)
return df
def _load_labels(
self,
start_date: str,
end_date: str,
fit_range: List[str]
) -> pl.DataFrame:
"""Load and normalize labels from Parquet files."""
# Map return type to indicator name
indicator_map = {
'o2c_twap1min': 'twap_open1m@1_twap_close1m@1',
'o2o_twap1min': 'twap_open1m@1_twap_open1m@2',
}
indicator = indicator_map.get(self.return_type, self.return_type)
# Load dominant contract mapping from Parquet
df_contract = load_from_pq(
self.dom_path,
start_date=start_date,
end_date=end_date
)
# Load returns from Parquet
df_return = load_from_pq(
self.label_path,
start_date=start_date,
end_date=end_date
)
# Filter for the specific indicator
df_return = df_return.filter(pl.col('indicator') == indicator)
# Merge with dominant contract mapping
df_return = df_return.join(
df_contract,
on=['datetime', 'instrument'],
how='inner'
)
# Use code_init as the instrument (continuous contract)
df_return = df_return.with_columns(
(pl.col('code_init') + 'Ind').alias('instrument')
)
# Select and rename return column
df_return = df_return.select([
'datetime',
'instrument',
pl.col('value').alias('ret')
])
# Apply normalization
df_return = self._normalize_label(df_return, fit_range)
return df_return
def _normalize_label(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame:
"""Apply specified normalization to label."""
fit_start, fit_end = fit_range
# Convert fit_range strings to date objects for comparison
fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
if self.normalization == 'zscore':
# Calculate mean/std on fit range
fit_data = pl_df.filter(
(pl.col('datetime') >= fit_start_date) &
(pl.col('datetime') <= fit_end_date)
)
mean = fit_data['ret'].mean()
std = fit_data['ret'].std()
result = pl_df.with_columns(
((pl.col('ret') - mean) / std).clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
return result
elif self.normalization == 'cs_zscore':
# Cross-sectional z-score per datetime
return pl_df.with_columns(
((pl.col('ret') - pl.col('ret').mean().over('datetime')) /
pl.col('ret').std().over('datetime')).clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
elif self.normalization == 'rolling_20':
return self._apply_rolling_norm(pl_df, window=20, fit_range=fit_range)
elif self.normalization == 'rolling_60':
return self._apply_rolling_norm(pl_df, window=60, fit_range=fit_range)
elif self.normalization == 'dual':
# Create all normalization variants
label_zscore = self._normalize_zscore(pl_df, fit_range)
label_cszscore = self._normalize_cs_zscore(pl_df)
label_roll20 = self._normalize_rolling(pl_df, window=20, fit_range=fit_range)
label_roll60 = self._normalize_rolling(pl_df, window=60, fit_range=fit_range)
# Get blend weights
weights = get_blend_weights(self.blend_weights)
# Join and blend
pl_df = label_zscore.join(label_cszscore, on=['datetime', 'instrument'])
pl_df = pl_df.join(label_roll20, on=['datetime', 'instrument'])
pl_df = pl_df.join(label_roll60, on=['datetime', 'instrument'])
return pl_df.with_columns(
(weights[0] * pl.col('label_zscore') +
weights[1] * pl.col('label_cszscore') +
weights[2] * pl.col('label_roll20') +
weights[3] * pl.col('label_roll60')).clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
else:
raise ValueError(f"Unknown normalization: {self.normalization}")
def _normalize_zscore(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame:
"""Create z-score normalized label."""
fit_start, fit_end = fit_range
fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date()
fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date()
fit_data = pl_df.filter(
(pl.col('datetime') >= fit_start_date) &
(pl.col('datetime') <= fit_end_date)
)
mean = fit_data['ret'].mean()
std = fit_data['ret'].std()
return pl_df.with_columns(
((pl.col('ret') - mean) / std).alias('label_zscore')
).select(['datetime', 'instrument', 'label_zscore'])
def _normalize_cs_zscore(self, pl_df: pl.DataFrame) -> pl.DataFrame:
"""Create cross-sectional z-score normalized label."""
return pl_df.with_columns(
((pl.col('ret') - pl.col('ret').mean().over('datetime')) /
pl.col('ret').std().over('datetime')).alias('label_cszscore')
).select(['datetime', 'instrument', 'label_cszscore'])
def _normalize_rolling(
self,
pl_df: pl.DataFrame,
window: int,
fit_range: List[str]
) -> pl.DataFrame:
"""Create rolling window normalized label."""
# Convert to pandas for rolling calculation
pd_df = pl_df.to_pandas().set_index(['datetime', 'instrument'])
# Unstack to wide format
df_wide = pd_df['ret'].unstack('instrument')
# Calculate rolling mean and std
rolling_mean = df_wide.rolling(window=window, min_periods=window//2).mean()
rolling_std = df_wide.rolling(window=window, min_periods=window//2).std()
# Normalize
df_normalized = (df_wide - rolling_mean) / rolling_std
# Restack
rolling_label = df_normalized.stack().reset_index()
rolling_label.columns = ['datetime', 'instrument', f'label_roll{window}']
return pl.from_pandas(rolling_label)
def _apply_rolling_norm(
self,
pl_df: pl.DataFrame,
window: int,
fit_range: List[str]
) -> pl.DataFrame:
"""Apply rolling normalization and cap."""
result = self._normalize_rolling(pl_df, window, fit_range)
return result.with_columns(
pl.col(f'label_roll{window}').clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
def _calculate_weights(self, pl_df: pl.DataFrame) -> pl.DataFrame:
"""Calculate sample weights based on return magnitude."""
# Base weights by return magnitude tiers
pl_df = pl_df.with_columns(
pl.when(pl.col('label').abs() > 1.5).then(pl.lit(2.5))
.when(pl.col('label').abs() > 1.0).then(pl.lit(2.0))
.when(pl.col('label').abs() > 0.5).then(pl.lit(1.5))
.when(pl.col('label').abs() > 0.2).then(pl.lit(1.0))
.otherwise(0.0).alias('weight')
)
# Apply negative return multiplier
if self.weight_factors.get('negative'):
pl_df = pl_df.with_columns(
pl.when(pl.col('label') < -0.5)
.then(pl.col('weight') * self.weight_factors['negative'])
.otherwise(pl.col('weight'))
.alias('weight')
)
# Apply positive return multiplier
if self.weight_factors.get('positive'):
pl_df = pl_df.with_columns(
pl.when(pl.col('label') > 0.5)
.then(pl.col('weight') * self.weight_factors['positive'])
.otherwise(pl.col('weight'))
.alias('weight')
)
return pl_df
def _clean_data(self, pl_df: pl.DataFrame) -> pl.DataFrame:
"""Clean data: remove inf/nan values."""
# Get numeric columns only
numeric_cols = [
c for c in pl_df.columns
if pl_df[c].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64]
]
# Replace inf with null, then drop nulls
pl_df = pl_df.with_columns([
pl.when(pl.col(c).is_infinite()).then(None).otherwise(pl.col(c)).alias(c)
for c in numeric_cols
])
return pl_df.drop_nulls()

@ -0,0 +1,44 @@
# CTA 1D Parquet Dataset
This directory contains requirements for CTA (Commodity Trading Advisor) futures
Parquet datasets used by alpha_lab.
## Tables
### cta_alpha158_1d
Alpha158 features for CTA futures.
- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_alpha159_0_7_beta`
- **Output**: `/data/parquet/dataset/cta_alpha158_1d/`
- **Columns**: ~163 feature columns + code, m_nDate
### cta_hffactor_1d
High-frequency factor features (8 columns).
- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_hffactor`
- **Output**: `/data/parquet/dataset/cta_hffactor_1d/`
- **Transformation**: Pivot from long to wide format
- Input columns: code, m_nDate, factor_name, value
- Output columns: code, m_nDate, vol_1min, skew_1min, ... (8 features)
- **Filter**: Only include factor_name in [vol_1min, skew_1min, volp_1min,
volp_ratio_1min, voln_ratio_1min, trend_strength_1min, pv_corr_1min,
flowin_ratio_1min]
### cta_dom_1d
Dominant contract mapping for continuous contracts.
- **Source**: `dfs://daily_stock_run.dwm_1day_cta_dom`
- **Output**: `/data/parquet/dataset/cta_dom_1d/`
- **Filter**: version = 'vp_csmax_roll2_cummax'
- **Aggregation**: GROUP BY m_nDate, code_init; SELECT first(code) as code
### cta_labels_1d
Return labels for different return types.
- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_hfvalue`
- **Output**: `/data/parquet/dataset/cta_labels_1d/`
- **Filter**: indicator in [twap_open1m@1_twap_close1m@1, twap_open1m@1_twap_open1m@2]
- **Columns**: code, m_nDate, indicator, value
## Consumer
Used by: `alpha_lab/cta_1d/src/loader_parquet.py`
The alpha_lab project will create a parallel loader that reads from these
Parquet tables instead of DolphinDB.

@ -0,0 +1,88 @@
# CTA 1D Parquet Dataset Requirements
# This file specifies the required Parquet tables for alpha_lab CTA 1D task
# Table 1: Alpha158 Features
cta_alpha158_1d:
source:
database: dfs://daily_stock_run
table: stg_1day_tinysoft_cta_alpha159_0_7_beta
host: 192.168.1.146
port: 8848
target:
path: cta_alpha158_1d/
partition_freq: 1D
col_datetime: m_nDate
code_format: tscode
description: Alpha158 features for CTA futures (~163 columns)
priority: medium
# Table 2: HFFactor Features (requires pivot)
cta_hffactor_1d:
source:
database: dfs://daily_stock_run
table: stg_1day_tinysoft_cta_hffactor
host: 192.168.1.146
port: 8848
# Long format: code, m_nDate, factor_name, value
# Pivot to wide format during export
pivot:
index: [code, m_nDate]
columns: factor_name
values: value
filter: # Only these 8 columns needed
- vol_1min
- skew_1min
- volp_1min
- volp_ratio_1min
- voln_ratio_1min
- trend_strength_1min
- pv_corr_1min
- flowin_ratio_1min
target:
path: cta_hffactor_1d/
partition_freq: 1D
col_datetime: m_nDate
code_format: tscode
description: High-frequency factor features (8 columns, pivoted from long format)
priority: medium
notes: Requires pivot transformation from long to wide format
# Table 3: Dominant Contract Mapping
cta_dom_1d:
source:
database: dfs://daily_stock_run
table: dwm_1day_cta_dom
host: 192.168.1.146
port: 8848
# Group and aggregate during export
group_by: [m_nDate, code_init]
filter: "version='vp_csmax_roll2_cummax'"
agg: "first(code) as code"
target:
path: cta_dom_1d/
partition_freq: 1D
col_datetime: m_nDate
code_format: tscode
description: Dominant contract mapping for continuous contracts
priority: medium
notes: Requires group_by + aggregation, filter by version
# Table 4: Return Labels
cta_labels_1d:
source:
database: dfs://daily_stock_run
table: stg_1day_tinysoft_cta_hfvalue
host: 192.168.1.146
port: 8848
# Filter for specific indicators
indicators:
- twap_open1m@1_twap_close1m@1 # o2c_twap1min
- twap_open1m@1_twap_open1m@2 # o2o_twap1min
target:
path: cta_labels_1d/
partition_freq: 1D
col_datetime: m_nDate
code_format: tscode
description: Return labels for different return types
priority: medium
notes: Filter indicator column for specific return types
Loading…
Cancel
Save