New FactorComboStrategy class (strategies/factor_combo.py) implements
8 champion factor signals (4 US, 4 CN) discovered through iterative
factor research, each at 4 rebalancing frequencies (daily/weekly/
biweekly/monthly). Registered in trader.py as fc_{signal}_{freq}.
Existing strategies and state files are untouched — safe to git pull
and restart monitor on server.
Also includes factor research scripts (factor_loop.py, factor_research.py,
etc.) used to discover and validate these factors.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
325 lines
13 KiB
Python
325 lines
13 KiB
Python
"""
|
|
Deep factor analysis — orthogonality, proper correlations, residual alpha.
|
|
|
|
For the top factor candidates identified in factor_research.py, this script:
|
|
1. Computes proper daily cross-sectional rank correlations between factors
|
|
2. Tests residual IC after neutralizing known factors (momentum, recovery)
|
|
3. Runs sub-period breakdown (2-year windows)
|
|
4. Tests factor combinations
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import warnings
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
import data_manager
|
|
from universe import UNIVERSES
|
|
from factor_research import (
|
|
factor_momentum_12_1,
|
|
factor_recovery,
|
|
factor_inverse_vol,
|
|
factor_short_term_reversal,
|
|
factor_idio_vol_change,
|
|
factor_max_drawdown_recovery,
|
|
factor_mean_reversion_residual,
|
|
factor_skewness,
|
|
factor_high_low_range as factor_range_compression,
|
|
factor_52w_high_distance as factor_near_52w_high,
|
|
factor_downside_beta_proxy as factor_low_downside_beta,
|
|
factor_lottery_demand,
|
|
factor_turnover_reversal,
|
|
factor_gap_momentum as factor_smooth_momentum,
|
|
factor_up_down_vol_ratio,
|
|
factor_trend_strength,
|
|
factor_consecutive_up_days,
|
|
factor_volume_price_divergence,
|
|
factor_recovery_acceleration,
|
|
factor_relative_volume_momentum,
|
|
factor_price_level,
|
|
factor_liquidity_premium,
|
|
compute_ic,
|
|
)
|
|
|
|
warnings.filterwarnings("ignore", category=FutureWarning)
|
|
|
|
|
|
def daily_cross_sectional_correlation(
|
|
sig_a: pd.DataFrame, sig_b: pd.DataFrame
|
|
) -> pd.Series:
|
|
"""Daily cross-sectional Spearman correlation between two factor signals."""
|
|
common_idx = sig_a.index.intersection(sig_b.index)
|
|
common_cols = sig_a.columns.intersection(sig_b.columns)
|
|
a = sig_a.loc[common_idx, common_cols]
|
|
b = sig_b.loc[common_idx, common_cols]
|
|
|
|
corrs = {}
|
|
for date in common_idx:
|
|
va = a.loc[date].dropna()
|
|
vb = b.loc[date].dropna()
|
|
common = va.index.intersection(vb.index)
|
|
if len(common) < 30:
|
|
continue
|
|
c = va[common].corr(vb[common], method="spearman")
|
|
if np.isfinite(c):
|
|
corrs[date] = c
|
|
return pd.Series(corrs)
|
|
|
|
|
|
def proper_factor_correlation_matrix(factors: dict[str, pd.DataFrame]) -> pd.DataFrame:
|
|
"""Compute average daily cross-sectional Spearman correlations."""
|
|
names = list(factors.keys())
|
|
n = len(names)
|
|
matrix = pd.DataFrame(1.0, index=names, columns=names)
|
|
|
|
for i in range(n):
|
|
for j in range(i + 1, n):
|
|
corr_series = daily_cross_sectional_correlation(factors[names[i]], factors[names[j]])
|
|
avg_corr = corr_series.mean() if len(corr_series) > 0 else np.nan
|
|
matrix.loc[names[i], names[j]] = avg_corr
|
|
matrix.loc[names[j], names[i]] = avg_corr
|
|
|
|
return matrix
|
|
|
|
|
|
def residual_signal(
|
|
target: pd.DataFrame,
|
|
controls: list[pd.DataFrame],
|
|
) -> pd.DataFrame:
|
|
"""Cross-sectionally orthogonalize target signal against control signals.
|
|
For each day, regress target ranks on control ranks, return residual."""
|
|
ranked_target = target.rank(axis=1, pct=True, na_option="keep")
|
|
ranked_controls = [c.rank(axis=1, pct=True, na_option="keep") for c in controls]
|
|
|
|
residuals = pd.DataFrame(index=target.index, columns=target.columns, dtype=float)
|
|
|
|
for date in target.index:
|
|
y = ranked_target.loc[date].dropna()
|
|
xs = [rc.loc[date].reindex(y.index) for rc in ranked_controls if date in rc.index]
|
|
if not xs:
|
|
residuals.loc[date] = y
|
|
continue
|
|
|
|
x_df = pd.concat(xs, axis=1).dropna()
|
|
common = y.index.intersection(x_df.index)
|
|
if len(common) < 30:
|
|
continue
|
|
|
|
y_c = y[common].values
|
|
x_c = x_df.loc[common].values
|
|
x_c = np.column_stack([np.ones(len(common)), x_c])
|
|
|
|
try:
|
|
coef, _, _, _ = np.linalg.lstsq(x_c, y_c, rcond=None)
|
|
resid = y_c - x_c @ coef
|
|
residuals.loc[date, common] = resid
|
|
except np.linalg.LinAlgError:
|
|
residuals.loc[date, common] = y[common].values
|
|
|
|
return residuals
|
|
|
|
|
|
def subperiod_ic(signal: pd.DataFrame, prices: pd.DataFrame, horizon: int = 5, window_years: int = 2):
|
|
"""Compute IC for each rolling sub-period."""
|
|
fwd_ret = prices.pct_change(horizon).shift(-horizon)
|
|
ic_series = compute_ic(signal, fwd_ret)
|
|
if len(ic_series) == 0:
|
|
return pd.DataFrame()
|
|
|
|
window = 252 * window_years
|
|
results = []
|
|
start = ic_series.index[0]
|
|
while start < ic_series.index[-1]:
|
|
end = start + pd.DateOffset(years=window_years)
|
|
subset = ic_series[(ic_series.index >= start) & (ic_series.index < end)]
|
|
if len(subset) > 100:
|
|
results.append({
|
|
"period": f"{start.year}-{end.year}",
|
|
"ic_mean": subset.mean(),
|
|
"ic_std": subset.std(),
|
|
"icir": subset.mean() / subset.std() if subset.std() > 0 else 0,
|
|
"pct_positive": (subset > 0).mean(),
|
|
"n_days": len(subset),
|
|
})
|
|
start = end
|
|
return pd.DataFrame(results)
|
|
|
|
|
|
def test_factor_combination(
|
|
factors: dict[str, pd.DataFrame],
|
|
factor_names: list[str],
|
|
weights: list[float],
|
|
prices: pd.DataFrame,
|
|
label: str,
|
|
):
|
|
"""Test a weighted combination of factors."""
|
|
ranked = [factors[n].rank(axis=1, pct=True, na_option="keep") for n in factor_names]
|
|
combo = sum(w * r for w, r in zip(weights, ranked))
|
|
|
|
fwd_5d = prices.pct_change(5).shift(-5)
|
|
ic_series = compute_ic(combo, fwd_5d)
|
|
if len(ic_series) == 0:
|
|
return None
|
|
|
|
return {
|
|
"combo": label,
|
|
"ic_5d": ic_series.mean(),
|
|
"icir_5d": ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0,
|
|
"ic_stab": (ic_series.rolling(252).mean().dropna() > 0).mean() if len(ic_series) > 252 else np.nan,
|
|
}
|
|
|
|
|
|
def run_analysis(market: str):
|
|
config = UNIVERSES[market]
|
|
benchmark = config["benchmark"]
|
|
|
|
print(f"Loading {market.upper()} price data...")
|
|
prices = data_manager.load(market)
|
|
stocks = prices.drop(columns=[benchmark], errors="ignore")
|
|
print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} days")
|
|
|
|
# Build factors
|
|
print("Computing factors...")
|
|
factors = {}
|
|
factors["momentum_12_1"] = factor_momentum_12_1(stocks)
|
|
factors["recovery"] = factor_recovery(stocks)
|
|
factors["inverse_vol"] = factor_inverse_vol(stocks)
|
|
factors["short_term_reversal"] = factor_short_term_reversal(stocks)
|
|
factors["drawdown_recovery"] = factor_max_drawdown_recovery(stocks)
|
|
factors["mean_rev_zscore"] = factor_mean_reversion_residual(stocks)
|
|
factors["neg_skewness"] = factor_skewness(stocks)
|
|
factors["near_52w_high"] = factor_near_52w_high(stocks)
|
|
factors["low_downside_beta"] = factor_low_downside_beta(stocks)
|
|
factors["smooth_momentum"] = factor_smooth_momentum(stocks)
|
|
factors["recovery_accel"] = factor_recovery_acceleration(stocks)
|
|
factors["range_compression"] = factor_range_compression(stocks)
|
|
|
|
if market == "cn":
|
|
factors["anti_lottery"] = factor_lottery_demand(stocks)
|
|
factors["vol_reversal"] = factor_turnover_reversal(stocks)
|
|
factors["low_price"] = factor_price_level(stocks)
|
|
factors["illiquidity"] = factor_liquidity_premium(stocks)
|
|
|
|
# ---- 1. Proper Cross-Sectional Correlation Matrix ----
|
|
print("\n" + "=" * 90)
|
|
print(f" 1. CROSS-SECTIONAL FACTOR CORRELATIONS — {market.upper()}")
|
|
print("=" * 90)
|
|
print("(Average daily Spearman correlation between factor ranks)\n")
|
|
|
|
corr = proper_factor_correlation_matrix(factors)
|
|
print(corr.round(3).to_string())
|
|
|
|
# ---- 2. Residual IC after neutralizing known factors ----
|
|
print("\n" + "=" * 90)
|
|
print(f" 2. RESIDUAL IC AFTER NEUTRALIZING KNOWN FACTORS — {market.upper()}")
|
|
print("=" * 90)
|
|
print("(IC of factor after cross-sectionally regressing out momentum + recovery)\n")
|
|
|
|
known = [factors["momentum_12_1"], factors["recovery"]]
|
|
fwd_5d = stocks.pct_change(5).shift(-5)
|
|
|
|
new_candidates = [k for k in factors if k not in ("momentum_12_1", "recovery", "inverse_vol")]
|
|
rows = []
|
|
for name in new_candidates:
|
|
resid = residual_signal(factors[name], known)
|
|
ic_series = compute_ic(resid, fwd_5d)
|
|
if len(ic_series) > 0:
|
|
rows.append({
|
|
"factor": name,
|
|
"raw_ic_5d": compute_ic(factors[name], fwd_5d).mean(),
|
|
"residual_ic_5d": ic_series.mean(),
|
|
"residual_icir_5d": ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0,
|
|
"pct_pos": (ic_series > 0).mean(),
|
|
})
|
|
|
|
resid_df = pd.DataFrame(rows).set_index("factor").sort_values("residual_icir_5d", ascending=False)
|
|
print(resid_df.round(4).to_string())
|
|
|
|
# ---- 3. Sub-Period Stability ----
|
|
print("\n" + "=" * 90)
|
|
print(f" 3. SUB-PERIOD IC STABILITY (2-year windows, 5-day horizon) — {market.upper()}")
|
|
print("=" * 90)
|
|
|
|
# Test top factors
|
|
if market == "us":
|
|
top_factors = ["low_downside_beta", "drawdown_recovery", "mean_rev_zscore", "short_term_reversal", "momentum_12_1"]
|
|
else:
|
|
top_factors = ["momentum_12_1", "anti_lottery", "inverse_vol", "vol_reversal", "near_52w_high"]
|
|
|
|
for name in top_factors:
|
|
if name not in factors:
|
|
continue
|
|
print(f"\n {name}:")
|
|
sp = subperiod_ic(factors[name], stocks, horizon=5)
|
|
if not sp.empty:
|
|
print(sp.to_string(index=False))
|
|
else:
|
|
print(" (insufficient data)")
|
|
|
|
# ---- 4. Factor Combinations ----
|
|
print("\n" + "=" * 90)
|
|
print(f" 4. FACTOR COMBINATIONS — {market.upper()}")
|
|
print("=" * 90)
|
|
print("(Testing multi-factor composites)\n")
|
|
|
|
combos = []
|
|
if market == "us":
|
|
tests = [
|
|
(["momentum_12_1", "low_downside_beta"], [0.5, 0.5], "mom+low_dbeta"),
|
|
(["momentum_12_1", "drawdown_recovery"], [0.5, 0.5], "mom+dd_recovery"),
|
|
(["momentum_12_1", "mean_rev_zscore"], [0.5, 0.5], "mom+mean_rev"),
|
|
(["momentum_12_1", "short_term_reversal"], [0.5, 0.5], "mom+STR"),
|
|
(["recovery", "low_downside_beta"], [0.5, 0.5], "recovery+low_dbeta"),
|
|
(["momentum_12_1", "recovery", "low_downside_beta"], [0.33, 0.33, 0.34], "mom+rec+low_dbeta"),
|
|
(["momentum_12_1", "recovery", "drawdown_recovery"], [0.33, 0.33, 0.34], "mom+rec+dd_rec"),
|
|
(["momentum_12_1", "recovery", "short_term_reversal"], [0.33, 0.33, 0.34], "mom+rec+STR"),
|
|
(["momentum_12_1", "recovery", "mean_rev_zscore"], [0.33, 0.33, 0.34], "mom+rec+meanrev"),
|
|
(["momentum_12_1", "recovery", "low_downside_beta", "short_term_reversal"],
|
|
[0.25, 0.25, 0.25, 0.25], "mom+rec+dbeta+STR"),
|
|
(["momentum_12_1", "recovery", "drawdown_recovery", "mean_rev_zscore"],
|
|
[0.25, 0.25, 0.25, 0.25], "mom+rec+ddrec+meanrev"),
|
|
]
|
|
else: # cn
|
|
tests = [
|
|
(["momentum_12_1", "anti_lottery"], [0.5, 0.5], "mom+anti_lottery"),
|
|
(["momentum_12_1", "inverse_vol"], [0.5, 0.5], "mom+inv_vol"),
|
|
(["momentum_12_1", "vol_reversal"], [0.5, 0.5], "mom+vol_reversal"),
|
|
(["momentum_12_1", "near_52w_high"], [0.5, 0.5], "mom+near52wh"),
|
|
(["momentum_12_1", "anti_lottery", "inverse_vol"], [0.33, 0.33, 0.34], "mom+alot+invvol"),
|
|
(["momentum_12_1", "anti_lottery", "vol_reversal"], [0.33, 0.33, 0.34], "mom+alot+volrev"),
|
|
(["momentum_12_1", "anti_lottery", "near_52w_high"], [0.33, 0.33, 0.34], "mom+alot+near52w"),
|
|
(["momentum_12_1", "recovery", "anti_lottery"], [0.33, 0.33, 0.34], "mom+rec+alot"),
|
|
(["momentum_12_1", "anti_lottery", "inverse_vol", "vol_reversal"],
|
|
[0.25, 0.25, 0.25, 0.25], "mom+alot+invvol+volrev"),
|
|
(["momentum_12_1", "anti_lottery", "near_52w_high", "vol_reversal"],
|
|
[0.25, 0.25, 0.25, 0.25], "mom+alot+52wh+volrev"),
|
|
]
|
|
|
|
# Also test the existing recovery+momentum baseline
|
|
baseline = test_factor_combination(factors, ["momentum_12_1", "recovery"], [0.5, 0.5], stocks, "BASELINE: mom+recovery")
|
|
if baseline:
|
|
combos.append(baseline)
|
|
|
|
for names, weights, label in tests:
|
|
if all(n in factors for n in names):
|
|
result = test_factor_combination(factors, names, weights, stocks, label)
|
|
if result:
|
|
combos.append(result)
|
|
|
|
combo_df = pd.DataFrame(combos).set_index("combo").sort_values("icir_5d", ascending=False)
|
|
print(combo_df.round(4).to_string())
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--market", default="us", choices=["us", "cn"])
|
|
args = parser.parse_args()
|
|
run_analysis(args.market)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|