Files
quant/factor_research.py
Gahow Wang ae25f2f6b5 Add 32 factor-combo strategies with configurable rebalancing frequency
New FactorComboStrategy class (strategies/factor_combo.py) implements
8 champion factor signals (4 US, 4 CN) discovered through iterative
factor research, each at 4 rebalancing frequencies (daily/weekly/
biweekly/monthly). Registered in trader.py as fc_{signal}_{freq}.

Existing strategies and state files are untouched — safe to git pull
and restart monitor on server.

Also includes factor research scripts (factor_loop.py, factor_research.py,
etc.) used to discover and validate these factors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 10:41:34 +08:00

548 lines
21 KiB
Python

"""
Factor Research Script — Professional QR-style factor mining.
Tests candidate alpha factors using:
- Information Coefficient (IC): rank correlation of signal vs forward returns
- IC Information Ratio (ICIR): mean(IC) / std(IC), measures signal consistency
- Quintile long-short spread: monotonicity of returns across signal buckets
- Turnover: daily rank change, proxy for trading cost
- Decay profile: IC at 1d, 5d, 10d, 20d horizons
- Sub-period stability: IC consistency across rolling windows
- Factor correlation matrix: ensures new factors are orthogonal to known ones
Usage:
uv run python factor_research.py --market us
uv run python factor_research.py --market cn
"""
from __future__ import annotations
import argparse
import warnings
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
warnings.filterwarnings("ignore", category=FutureWarning)
HORIZONS = [1, 5, 10, 20]
# ---------------------------------------------------------------------------
# Factor definitions — each returns a DataFrame (dates x stocks) of scores
# ---------------------------------------------------------------------------
def _safe_rank(df: pd.DataFrame) -> pd.DataFrame:
return df.rank(axis=1, pct=True, na_option="keep")
def _rolling_ret(prices: pd.DataFrame, window: int) -> pd.DataFrame:
return prices.pct_change(window)
# --- Known factors (baselines) ---
def factor_momentum_12_1(prices: pd.DataFrame) -> pd.DataFrame:
"""Classic 12-1 month momentum."""
return prices.shift(21).pct_change(231)
def factor_recovery(prices: pd.DataFrame) -> pd.DataFrame:
"""Price / 63-day low - 1."""
return prices / prices.rolling(63, min_periods=63).min() - 1
def factor_inverse_vol(prices: pd.DataFrame) -> pd.DataFrame:
"""Negative 60-day realized volatility (low vol = high score)."""
return -prices.pct_change().rolling(60, min_periods=60).std()
# --- NEW candidate factors ---
def factor_short_term_reversal(prices: pd.DataFrame) -> pd.DataFrame:
"""5-day return reversal. Hypothesis: short-term mean reversion."""
return -prices.pct_change(5)
def factor_idio_vol_change(prices: pd.DataFrame) -> pd.DataFrame:
"""Change in idiosyncratic volatility (20d vs 60d).
Hypothesis: declining vol = stabilizing, predicts positive returns."""
ret = prices.pct_change()
vol_20 = ret.rolling(20, min_periods=20).std()
vol_60 = ret.rolling(60, min_periods=60).std()
return -(vol_20 / vol_60.replace(0, np.nan) - 1) # negative = vol declining
def factor_volume_price_divergence(prices: pd.DataFrame, volume: pd.DataFrame | None = None) -> pd.DataFrame:
"""Price up but momentum fading — proxy via acceleration.
Without volume data, use return acceleration as proxy."""
ret_5 = prices.pct_change(5)
ret_20 = prices.pct_change(20)
return ret_5 - ret_20 / 4 # recent returns outpacing trend
def factor_max_drawdown_recovery(prices: pd.DataFrame) -> pd.DataFrame:
"""How much of the 60-day max drawdown has been recovered.
Hypothesis: stocks that recover from drawdowns continue recovering."""
rolling_max = prices.rolling(60, min_periods=60).max()
drawdown = prices / rolling_max - 1 # negative
rolling_min_dd = drawdown.rolling(60, min_periods=20).min() # worst drawdown
recovery_pct = drawdown / rolling_min_dd.replace(0, np.nan)
return recovery_pct # closer to 0 = more recovered
def factor_skewness(prices: pd.DataFrame) -> pd.DataFrame:
"""Negative 20-day return skewness.
Hypothesis: negatively skewed stocks are overpriced (lottery preference)."""
ret = prices.pct_change()
return -ret.rolling(20, min_periods=20).skew()
def factor_high_low_range(prices: pd.DataFrame) -> pd.DataFrame:
"""20-day high-low range relative to price.
Hypothesis: narrow range = consolidation, breakout ahead."""
high_20 = prices.rolling(20, min_periods=20).max()
low_20 = prices.rolling(20, min_periods=20).min()
mid = (high_20 + low_20) / 2
return -(high_20 - low_20) / mid.replace(0, np.nan) # negative range = narrow = high score
def factor_mean_reversion_residual(prices: pd.DataFrame) -> pd.DataFrame:
"""Distance from 20-day MA as fraction of 60-day vol.
Hypothesis: stocks far below MA revert. Z-score style."""
ma_20 = prices.rolling(20, min_periods=20).mean()
vol_60 = prices.pct_change().rolling(60, min_periods=60).std() * prices
return -(prices - ma_20) / vol_60.replace(0, np.nan) # below MA = high score
def factor_up_down_vol_ratio(prices: pd.DataFrame) -> pd.DataFrame:
"""Ratio of upside to downside semi-deviation (20d).
Hypothesis: stocks with more upside vol have positive momentum."""
ret = prices.pct_change()
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).std()
down_vol = ret.where(ret < 0, 0).rolling(20, min_periods=15).std()
return up_vol / down_vol.replace(0, np.nan)
def factor_consecutive_up_days(prices: pd.DataFrame) -> pd.DataFrame:
"""Fraction of positive return days in last 10 days.
Hypothesis: persistent winners keep winning (short-term)."""
ret = prices.pct_change()
return (ret > 0).astype(float).rolling(10, min_periods=10).mean()
def factor_gap_momentum(prices: pd.DataFrame) -> pd.DataFrame:
"""Cumulative overnight-like gaps: close-to-close vs intraday proxy.
Using 1-day returns smoothed over 20 days minus 5-day return.
Hypothesis: smooth consistent returns beat volatile ones."""
ret_1d = prices.pct_change()
smoothness = ret_1d.rolling(20, min_periods=20).mean() * 20
raw_20d = prices.pct_change(20)
return smoothness - raw_20d # positive = smoother path
def factor_recovery_acceleration(prices: pd.DataFrame) -> pd.DataFrame:
"""Rate of change of recovery factor.
Hypothesis: accelerating recovery is stronger signal than level."""
recovery = prices / prices.rolling(63, min_periods=63).min() - 1
return recovery.pct_change(5)
def factor_trend_strength(prices: pd.DataFrame) -> pd.DataFrame:
"""R-squared of log-price vs time over 60 days.
Hypothesis: stocks trending linearly (high R2) continue."""
log_p = np.log(prices.replace(0, np.nan))
def _r2(series):
y = series.dropna().values
if len(y) < 30:
return np.nan
x = np.arange(len(y), dtype=float)
x -= x.mean()
y_dm = y - y.mean()
ss_xy = (x * y_dm).sum()
ss_xx = (x * x).sum()
ss_yy = (y_dm * y_dm).sum()
if ss_xx == 0 or ss_yy == 0:
return np.nan
r2 = (ss_xy ** 2) / (ss_xx * ss_yy)
slope = ss_xy / ss_xx
return r2 if slope > 0 else -r2 # sign by direction
return log_p.rolling(60, min_periods=30).apply(_r2, raw=False)
def factor_relative_volume_momentum(prices: pd.DataFrame) -> pd.DataFrame:
"""Price momentum weighted by how 'cheap' a stock is relative to 52-week range.
Hypothesis: momentum in stocks near lows is more likely to persist."""
mom_20 = prices.pct_change(20)
high_252 = prices.rolling(252, min_periods=126).max()
low_252 = prices.rolling(252, min_periods=126).min()
position_in_range = (prices - low_252) / (high_252 - low_252).replace(0, np.nan)
return mom_20 * (1 - position_in_range) # momentum * cheapness
def factor_52w_high_distance(prices: pd.DataFrame) -> pd.DataFrame:
"""Distance from 52-week high.
Hypothesis: stocks near their highs continue (anchoring bias)."""
high_252 = prices.rolling(252, min_periods=126).max()
return prices / high_252 # closer to 1 = near high
def factor_downside_beta_proxy(prices: pd.DataFrame) -> pd.DataFrame:
"""Proxy for downside beta using co-movement on market down days.
Hypothesis: low downside beta outperforms (asymmetric risk)."""
ret = prices.pct_change()
market_ret = ret.mean(axis=1)
down_days = market_ret < 0
# Mask non-down-day returns to NaN, then rolling mean
# Use numpy for correct broadcasting, wider window (120d) so ~54 down
# days are available, well above min_periods=20
arr = ret.values.copy()
arr[~down_days.values, :] = np.nan
down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns)
avg_down = down_ret.rolling(120, min_periods=20).mean()
return -avg_down # negative = less downside = good
# --- A-share specific factors ---
def factor_liquidity_premium(prices: pd.DataFrame) -> pd.DataFrame:
"""Amihud illiquidity proxy (using returns only, no volume).
Hypothesis: illiquid stocks earn premium in A-shares (retail driven)."""
ret = prices.pct_change()
# Use absolute return as illiquidity proxy (higher abs ret = less liquid)
illiq = ret.abs().rolling(20, min_periods=15).mean()
return illiq
def factor_lottery_demand(prices: pd.DataFrame) -> pd.DataFrame:
"""Max daily return in past 20 days (negative).
Hypothesis: lottery stocks (high max return) underperform.
Strong in A-shares due to retail speculation."""
ret = prices.pct_change()
return -ret.rolling(20, min_periods=15).max()
def factor_turnover_reversal(prices: pd.DataFrame) -> pd.DataFrame:
"""Interaction of short-term returns with volatility.
High vol + negative return = oversold bounce candidate.
Common A-share alpha source."""
ret_5 = prices.pct_change(5)
vol_20 = prices.pct_change().rolling(20, min_periods=15).std()
return -ret_5 * vol_20 # oversold + high vol = positive
def factor_price_level(prices: pd.DataFrame) -> pd.DataFrame:
"""Negative absolute price level.
Hypothesis: low-priced stocks attract retail in A-shares (penny stock effect)."""
return -prices
# ---------------------------------------------------------------------------
# IC and analytics engine
# ---------------------------------------------------------------------------
def compute_ic(
signal: pd.DataFrame,
forward_ret: pd.DataFrame,
) -> pd.Series:
"""Cross-sectional rank IC (Spearman) per day."""
common_idx = signal.index.intersection(forward_ret.index)
common_cols = signal.columns.intersection(forward_ret.columns)
sig = signal.loc[common_idx, common_cols]
fwd = forward_ret.loc[common_idx, common_cols]
ics = []
for date in common_idx:
s = sig.loc[date].dropna()
f = fwd.loc[date].dropna()
common = s.index.intersection(f.index)
if len(common) < 30:
continue
ic = s[common].corr(f[common], method="spearman")
if np.isfinite(ic):
ics.append((date, ic))
if not ics:
return pd.Series(dtype=float)
return pd.Series(dict(ics))
def compute_quintile_returns(
signal: pd.DataFrame,
forward_ret: pd.DataFrame,
n_quantiles: int = 5,
) -> pd.DataFrame:
"""Average forward return by signal quintile, per day, then time-averaged."""
common_idx = signal.index.intersection(forward_ret.index)
common_cols = signal.columns.intersection(forward_ret.columns)
sig = signal.loc[common_idx, common_cols]
fwd = forward_ret.loc[common_idx, common_cols]
records = []
for date in common_idx:
s = sig.loc[date].dropna()
f = fwd.loc[date].dropna()
common = s.index.intersection(f.index)
if len(common) < 50:
continue
scores = s[common]
rets = f[common]
try:
quintile = pd.qcut(scores, n_quantiles, labels=False, duplicates="drop")
except ValueError:
continue
for q in range(n_quantiles):
mask = quintile == q
if mask.sum() > 0:
records.append({"date": date, "quintile": q + 1, "return": rets[mask].mean()})
if not records:
return pd.DataFrame()
df = pd.DataFrame(records)
return df.groupby("quintile")["return"].mean() * 252 # annualize
def compute_turnover(signal: pd.DataFrame) -> float:
"""Average daily rank change (turnover proxy)."""
ranked = signal.rank(axis=1, pct=True, na_option="keep")
daily_change = ranked.diff().abs().mean(axis=1)
return float(daily_change.mean())
def compute_factor_correlation(factors: dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Cross-sectional IC correlation between all factor pairs."""
names = list(factors.keys())
n = len(names)
corr_matrix = pd.DataFrame(np.nan, index=names, columns=names)
# Use time-series of rank-averaged signals
avg_ranks = {}
for name, sig in factors.items():
ranked = sig.rank(axis=1, pct=True, na_option="keep")
avg_ranks[name] = ranked.mean(axis=1).dropna()
for i in range(n):
for j in range(i, n):
s1 = avg_ranks[names[i]]
s2 = avg_ranks[names[j]]
common = s1.index.intersection(s2.index)
if len(common) > 100:
c = s1[common].corr(s2[common])
corr_matrix.loc[names[i], names[j]] = c
corr_matrix.loc[names[j], names[i]] = c
if i == j:
corr_matrix.loc[names[i], names[j]] = 1.0
return corr_matrix
def analyze_factor(
name: str,
signal: pd.DataFrame,
prices: pd.DataFrame,
horizons: list[int] | None = None,
) -> dict:
"""Full single-factor analysis."""
if horizons is None:
horizons = HORIZONS
results = {"name": name}
# Forward returns at each horizon
for h in horizons:
fwd_ret = prices.pct_change(h).shift(-h)
ic_series = compute_ic(signal, fwd_ret)
if len(ic_series) == 0:
results[f"ic_{h}d"] = np.nan
results[f"icir_{h}d"] = np.nan
continue
ic_mean = ic_series.mean()
ic_std = ic_series.std()
icir = ic_mean / ic_std if ic_std > 0 else 0.0
results[f"ic_{h}d"] = ic_mean
results[f"icir_{h}d"] = icir
if h == 1:
results["ic_1d_series"] = ic_series
# Quintile analysis at 5-day horizon
fwd_5d = prices.pct_change(5).shift(-5)
quintiles = compute_quintile_returns(signal, fwd_5d)
if not quintiles.empty:
results["q5_return"] = float(quintiles.iloc[-1]) # top quintile
results["q1_return"] = float(quintiles.iloc[0]) # bottom quintile
results["long_short_ann"] = float(quintiles.iloc[-1] - quintiles.iloc[0])
results["monotonicity"] = float(quintiles.corr(pd.Series(range(1, len(quintiles) + 1), index=quintiles.index)))
results["quintile_returns"] = quintiles
else:
results["q5_return"] = np.nan
results["q1_return"] = np.nan
results["long_short_ann"] = np.nan
results["monotonicity"] = np.nan
# Turnover
results["turnover"] = compute_turnover(signal)
# Sub-period IC stability (rolling 252-day IC mean)
if "ic_1d_series" in results and len(results["ic_1d_series"]) > 252:
rolling_ic = results["ic_1d_series"].rolling(252).mean().dropna()
results["ic_stability"] = float((rolling_ic > 0).mean()) # fraction of time IC > 0
results["ic_worst_year"] = float(rolling_ic.min())
results["ic_best_year"] = float(rolling_ic.max())
else:
results["ic_stability"] = np.nan
results["ic_worst_year"] = np.nan
results["ic_best_year"] = np.nan
return results
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def get_all_factors(prices: pd.DataFrame, market: str) -> dict[str, pd.DataFrame]:
"""Build all candidate factor signals."""
factors = {}
# Known baselines
factors["momentum_12_1"] = factor_momentum_12_1(prices)
factors["recovery"] = factor_recovery(prices)
factors["inverse_vol"] = factor_inverse_vol(prices)
# New candidates — universal
factors["short_term_reversal"] = factor_short_term_reversal(prices)
factors["idio_vol_change"] = factor_idio_vol_change(prices)
factors["return_acceleration"] = factor_volume_price_divergence(prices)
factors["drawdown_recovery"] = factor_max_drawdown_recovery(prices)
factors["neg_skewness"] = factor_skewness(prices)
factors["range_compression"] = factor_high_low_range(prices)
factors["mean_rev_zscore"] = factor_mean_reversion_residual(prices)
factors["up_down_vol_ratio"] = factor_up_down_vol_ratio(prices)
factors["win_streak"] = factor_consecutive_up_days(prices)
factors["smooth_momentum"] = factor_gap_momentum(prices)
factors["recovery_accel"] = factor_recovery_acceleration(prices)
factors["trend_r2"] = factor_trend_strength(prices)
factors["cheap_momentum"] = factor_relative_volume_momentum(prices)
factors["near_52w_high"] = factor_52w_high_distance(prices)
factors["low_downside_beta"] = factor_downside_beta_proxy(prices)
# A-share specific (also test on US for comparison)
if market == "cn":
factors["illiquidity"] = factor_liquidity_premium(prices)
factors["anti_lottery"] = factor_lottery_demand(prices)
factors["vol_reversal"] = factor_turnover_reversal(prices)
factors["low_price"] = factor_price_level(prices)
return factors
def print_summary_table(results: list[dict], market: str) -> None:
"""Print a ranked summary of all factors."""
rows = []
for r in results:
rows.append({
"Factor": r["name"],
"IC_1d": r.get("ic_1d", np.nan),
"ICIR_1d": r.get("icir_1d", np.nan),
"IC_5d": r.get("ic_5d", np.nan),
"ICIR_5d": r.get("icir_5d", np.nan),
"IC_20d": r.get("ic_20d", np.nan),
"ICIR_20d": r.get("icir_20d", np.nan),
"LS_5d_ann": r.get("long_short_ann", np.nan),
"Mono": r.get("monotonicity", np.nan),
"Turnover": r.get("turnover", np.nan),
"IC_stab": r.get("ic_stability", np.nan),
"IC_worst_yr": r.get("ic_worst_year", np.nan),
})
df = pd.DataFrame(rows).set_index("Factor")
df = df.sort_values("ICIR_5d", ascending=False)
print(f"\n{'='*100}")
print(f" FACTOR RESEARCH RESULTS — {market.upper()} MARKET")
print(f"{'='*100}")
print("\nRanked by 5-day ICIR (most important metric for tradeable alpha):\n")
print(df.round(4).to_string())
# Highlight top factors
print(f"\n{'='*100}")
print(" TOP FACTORS (ICIR_5d > 0.05 and IC_stability > 0.6)")
print(f"{'='*100}")
top = df[(df["ICIR_5d"].abs() > 0.05) & (df["IC_stab"] > 0.6)]
if top.empty:
top = df.head(5)
print(" (No factor met strict threshold; showing top 5 by ICIR_5d)")
print(top.round(4).to_string())
# Quintile details for top factors
print(f"\n{'='*100}")
print(" QUINTILE RETURN PROFILES (annualized, 5-day forward)")
print(f"{'='*100}")
for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:8]:
qr = r.get("quintile_returns")
if qr is not None and not qr.empty:
q_str = " ".join(f"Q{int(k)}: {v:+.1%}" for k, v in qr.items())
ls = r.get("long_short_ann", 0)
print(f" {r['name']:25s} | {q_str} | L/S: {ls:+.1%}")
def main():
parser = argparse.ArgumentParser(description="Factor research")
parser.add_argument("--market", default="us", choices=["us", "cn"])
parser.add_argument("--years", type=int, default=None, help="Limit to last N years")
args = parser.parse_args()
market = args.market
config = UNIVERSES[market]
benchmark = config["benchmark"]
print(f"Loading {market.upper()} price data...")
prices = data_manager.load(market)
# Remove benchmark from stock universe
stocks = prices.drop(columns=[benchmark], errors="ignore")
if args.years:
cutoff = stocks.index[-1] - pd.DateOffset(years=args.years)
stocks = stocks[stocks.index >= cutoff]
print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} trading days")
print(f"Date range: {stocks.index[0].date()} to {stocks.index[-1].date()}")
# Build all factor signals
print("\nComputing factor signals...")
factors = get_all_factors(stocks, market)
# Analyze each factor
print("Running factor analysis (this may take a few minutes)...")
results = []
for name, signal in factors.items():
print(f" Analyzing: {name}...")
r = analyze_factor(name, signal, stocks)
results.append(r)
# Print results
print_summary_table(results, market)
# Factor correlation matrix
print(f"\n{'='*100}")
print(" FACTOR CORRELATION MATRIX (rank-averaged cross-sectional)")
print(f"{'='*100}")
corr = compute_factor_correlation(factors)
# Show only top factors
top_names = [r["name"] for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:10]]
top_names_present = [n for n in top_names if n in corr.index]
print(corr.loc[top_names_present, top_names_present].round(2).to_string())
if __name__ == "__main__":
main()