Add 32 factor-combo strategies with configurable rebalancing frequency

New FactorComboStrategy class (strategies/factor_combo.py) implements
8 champion factor signals (4 US, 4 CN) discovered through iterative
factor research, each at 4 rebalancing frequencies (daily/weekly/
biweekly/monthly). Registered in trader.py as fc_{signal}_{freq}.

Existing strategies and state files are untouched — safe to git pull
and restart monitor on server.

Also includes factor research scripts (factor_loop.py, factor_research.py,
etc.) used to discover and validate these factors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-08 10:41:34 +08:00
parent a66b039d2d
commit ae25f2f6b5
13 changed files with 3402 additions and 1 deletions

8
.gitignore vendored
View File

@@ -21,6 +21,14 @@ data/universe_*.json
# Trader state — per-machine, regenerated by auto/simulate
data/trader_*.json
# Factor attribution output and cached factors
data/attribution_*/
data/factors/
data/factors_review_tmp/
# External tool artifacts
docs/superpowers/
# IDE / editor
.idea/
.vscode/

View File

@@ -44,7 +44,7 @@ No test suite or linter is configured.
**Backtest engine** (`main.py`): Orchestrates data loading, strategy execution, and visualization. The `backtest()` function is vectorized — it takes a strategy and price DataFrame, applies transaction costs (proportional + optional fixed per-trade fee) via turnover, and returns an equity curve. Supports two execution modes: `close` (classic) and `open-close` (signal on open prices, execute at close).
**Daily trader** (`trader.py`): Live/forward-testing system with persistent portfolio state in `data/trader_{market}_{strategy}.json`. The `auto` subcommand runs both signal generation and execution in a single invocation — designed for cron. The `simulate` subcommand replays a date range day-by-day with realistic portfolio tracking (fractional shares, cash, commissions). Available strategies: `recovery_mom_top10`, `recovery_mom_top20`, `momentum`, `momentum_quality`, `dual_momentum`, `inverse_vol`, `trend_following`, `buy_and_hold`.
**Daily trader** (`trader.py`): Live/forward-testing system with persistent portfolio state in `data/trader_{market}_{strategy}.json`. The `auto` subcommand runs both signal generation and execution in a single invocation — designed for cron. The `simulate` subcommand replays a date range day-by-day with realistic portfolio tracking (fractional shares, cash, commissions). Available strategies: `recovery_mom_top10`, `recovery_mom_top20`, `momentum`, `momentum_quality`, `dual_momentum`, `inverse_vol`, `trend_following`, `buy_and_hold`, plus 32 factor-combo strategies (`fc_{signal}_{freq}` — see `strategies/factor_combo.py`).
**Strategy protocol** (`strategies/base.py`): All strategies inherit from `Strategy` ABC and implement `generate_signals(data) → DataFrame` where the returned DataFrame contains portfolio weights (rows = dates, columns = assets, values sum to ~1.0 per row). Each strategy is responsible for applying its own 1-day lag via `.shift(1)` to avoid lookahead bias — the backtest engine does not shift.
@@ -59,6 +59,7 @@ No test suite or linter is configured.
- `momentum_quality.py` — Momentum + return consistency + low drawdown
- `adaptive_momentum.py` — Momentum weighted by inverse volatility
- `recovery_momentum.py` — Recovery (price/63d low) + 12-1mo momentum composite. Best US performer.
- `factor_combo.py` — Configurable factor-combination strategies with daily/weekly/biweekly/monthly rebalancing. US champions: `rec_mfilt+deep_upvol` (50.7% CAGR monthly), `ma200+mom7m+rec126`, `rec_mfilt+ma200`, `mom7m+rec126`. CN champions: `up_cap+quality_mom` (26.1% CAGR monthly), `down_resil+qual_mom`, `rec63+mom_gap`, `up_cap+mom_gap`. All registered in trader.py as `fc_{signal}_{freq}` (e.g., `fc_rec_mfilt_deep_upvol_monthly`). 32 new strategies total.
**Metrics** (`metrics.py`): Standalone functions for portfolio analytics (Sharpe, Sortino, Calmar, max drawdown, etc.). `summary()` prints a formatted report and returns a dict.

213
factor_backtest.py Normal file
View File

@@ -0,0 +1,213 @@
"""
Backtest best factor combinations with yearly return breakdown.
US best: momentum + recovery + low_downside_beta + short_term_reversal
CN best: momentum + anti_lottery + vol_reversal
"""
from __future__ import annotations
import argparse
import numpy as np
import pandas as pd
import data_manager
import metrics
from universe import UNIVERSES
from factor_research import (
factor_momentum_12_1,
factor_recovery,
factor_short_term_reversal,
factor_downside_beta_proxy,
factor_lottery_demand,
factor_turnover_reversal,
factor_52w_high_distance,
)
def build_strategy_signals(
prices: pd.DataFrame,
factor_funcs: list,
weights: list[float],
top_n: int = 10,
rebal_freq: int = 21,
) -> pd.DataFrame:
"""Build equal-weight top-N strategy from ranked factor combination."""
signals_list = [f(prices) for f in factor_funcs]
ranked = [s.rank(axis=1, pct=True, na_option="keep") for s in signals_list]
composite = sum(w * r for w, r in zip(weights, ranked))
# Warmup: need at least 252 days
warmup = 252
rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= top_n
top_mask = (rank <= top_n) & enough.values.reshape(-1, 1)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
signals = raw.div(row_sums, axis=0).fillna(0.0)
# Monthly rebalance
rebal_mask = pd.Series(False, index=prices.index)
rebal_indices = list(range(warmup, len(prices), rebal_freq))
rebal_mask.iloc[rebal_indices] = True
signals[~rebal_mask] = np.nan
signals = signals.ffill().fillna(0.0)
signals.iloc[:warmup] = 0.0
return signals.shift(1).fillna(0.0)
def backtest_equity(signals: pd.DataFrame, prices: pd.DataFrame, cost: float = 0.001) -> pd.Series:
"""Simple vectorized backtest returning equity curve."""
returns = prices.pct_change().fillna(0.0)
port_ret = (signals * returns).sum(axis=1)
# Transaction costs via turnover
turnover = signals.diff().abs().sum(axis=1)
port_ret -= turnover * cost
equity = (1 + port_ret).cumprod() * 100000
return equity
def yearly_returns(equity: pd.Series) -> pd.DataFrame:
"""Compute calendar year returns from equity curve."""
daily_ret = equity.pct_change().fillna(0)
years = daily_ret.index.year
rows = []
for year in sorted(years.unique()):
mask = years == year
yr_ret = (1 + daily_ret[mask]).prod() - 1
# Also compute max drawdown for the year
eq_yr = equity[mask]
running_max = eq_yr.cummax()
dd = (eq_yr - running_max) / running_max
rows.append({
"year": year,
"return": yr_ret,
"max_dd": dd.min(),
"start_val": float(eq_yr.iloc[0]),
"end_val": float(eq_yr.iloc[-1]),
})
return pd.DataFrame(rows).set_index("year")
def run(market: str, years_list: list[int]):
config = UNIVERSES[market]
benchmark = config["benchmark"]
print(f"Loading {market.upper()} price data...")
prices = data_manager.load(market)
bench_prices = prices[benchmark] if benchmark in prices.columns else None
stocks = prices.drop(columns=[benchmark], errors="ignore")
if market == "us":
label = "Mom+Recovery+LowDBeta+STR"
factor_funcs = [factor_momentum_12_1, factor_recovery, factor_downside_beta_proxy, factor_short_term_reversal]
weights = [0.25, 0.25, 0.25, 0.25]
baseline_label = "Recovery+Mom (baseline)"
baseline_funcs = [factor_momentum_12_1, factor_recovery]
baseline_weights = [0.5, 0.5]
else:
label = "Mom+Near52wHigh+VolReversal"
factor_funcs = [factor_momentum_12_1, factor_52w_high_distance, factor_turnover_reversal]
weights = [0.40, 0.30, 0.30]
baseline_label = "Mom+Recovery (baseline)"
baseline_funcs = [factor_momentum_12_1, factor_recovery]
baseline_weights = [0.5, 0.5]
for top_n in [10]:
print(f"\n{'='*90}")
print(f" {market.upper()} — Top {top_n}{label}")
print(f"{'='*90}")
# Best combo
sig = build_strategy_signals(stocks, factor_funcs, weights, top_n=top_n)
eq = backtest_equity(sig, stocks)
# Baseline
sig_base = build_strategy_signals(stocks, baseline_funcs, baseline_weights, top_n=top_n)
eq_base = backtest_equity(sig_base, stocks)
# Benchmark
if bench_prices is not None:
bp = bench_prices.dropna()
eq_bench = bp / bp.iloc[0] * 100000
for n_years in years_list:
cutoff = stocks.index[-1] - pd.DateOffset(years=n_years)
eq_slice = eq[eq.index >= cutoff]
eq_base_slice = eq_base[eq_base.index >= cutoff]
if len(eq_slice) < 50:
continue
# Normalize to starting capital
eq_norm = eq_slice / eq_slice.iloc[0] * 100000
eq_base_norm = eq_base_slice / eq_base_slice.iloc[0] * 100000
yr = yearly_returns(eq_norm)
yr_base = yearly_returns(eq_base_norm)
if bench_prices is not None:
eq_bench_slice = eq_bench[eq_bench.index >= cutoff]
eq_bench_norm = eq_bench_slice / eq_bench_slice.iloc[0] * 100000
yr_bench = yearly_returns(eq_bench_norm)
print(f"\n--- Last {n_years} Years (from {eq_slice.index[0].date()}) ---\n")
# Combined table
print(f" {'Year':<6} | {label:>30} | {baseline_label:>25} | {'Benchmark':>12} | {'Alpha vs Bench':>14}")
print(f" {'-'*6}-+-{'-'*30}-+-{'-'*25}-+-{'-'*12}-+-{'-'*14}")
all_years = sorted(yr.index.tolist())
total_new = 1.0
total_base = 1.0
total_bench = 1.0
for y in all_years:
r_new = yr.loc[y, "return"] if y in yr.index else 0
dd_new = yr.loc[y, "max_dd"] if y in yr.index else 0
r_base = yr_base.loc[y, "return"] if y in yr_base.index else 0
r_bench = yr_bench.loc[y, "return"] if bench_prices is not None and y in yr_bench.index else 0
alpha = r_new - r_bench
total_new *= (1 + r_new)
total_base *= (1 + r_base)
total_bench *= (1 + r_bench)
print(f" {y:<6} | {r_new:>+14.2%} (dd {dd_new:>+7.2%}) | {r_base:>+25.2%} | {r_bench:>+12.2%} | {alpha:>+14.2%}")
total_r_new = total_new - 1
total_r_base = total_base - 1
total_r_bench = total_bench - 1
cagr_new = (total_new ** (1 / n_years)) - 1
cagr_base = (total_base ** (1 / n_years)) - 1
cagr_bench = (total_bench ** (1 / n_years)) - 1
print(f" {'-'*6}-+-{'-'*30}-+-{'-'*25}-+-{'-'*12}-+-{'-'*14}")
print(f" {'Total':<6} | {total_r_new:>+14.2%}{' '*16} | {total_r_base:>+25.2%} | {total_r_bench:>+12.2%} |")
print(f" {'CAGR':<6} | {cagr_new:>+14.2%}{' '*16} | {cagr_base:>+25.2%} | {cagr_bench:>+12.2%} |")
# Full period metrics
print(f"\n Full metrics ({label}):")
daily_ret = eq_norm.pct_change().dropna()
sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0
running_max = eq_norm.cummax()
max_dd = ((eq_norm - running_max) / running_max).min()
print(f" Sharpe: {sharpe:.2f} | Max Drawdown: {max_dd:.2%} | Win Rate: {(daily_ret > 0).mean():.2%}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--market", default="us", choices=["us", "cn"])
args = parser.parse_args()
run(args.market, years_list=[3, 5, 10])
if __name__ == "__main__":
main()

324
factor_deep_analysis.py Normal file
View File

@@ -0,0 +1,324 @@
"""
Deep factor analysis — orthogonality, proper correlations, residual alpha.
For the top factor candidates identified in factor_research.py, this script:
1. Computes proper daily cross-sectional rank correlations between factors
2. Tests residual IC after neutralizing known factors (momentum, recovery)
3. Runs sub-period breakdown (2-year windows)
4. Tests factor combinations
"""
from __future__ import annotations
import argparse
import warnings
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
from factor_research import (
factor_momentum_12_1,
factor_recovery,
factor_inverse_vol,
factor_short_term_reversal,
factor_idio_vol_change,
factor_max_drawdown_recovery,
factor_mean_reversion_residual,
factor_skewness,
factor_high_low_range as factor_range_compression,
factor_52w_high_distance as factor_near_52w_high,
factor_downside_beta_proxy as factor_low_downside_beta,
factor_lottery_demand,
factor_turnover_reversal,
factor_gap_momentum as factor_smooth_momentum,
factor_up_down_vol_ratio,
factor_trend_strength,
factor_consecutive_up_days,
factor_volume_price_divergence,
factor_recovery_acceleration,
factor_relative_volume_momentum,
factor_price_level,
factor_liquidity_premium,
compute_ic,
)
warnings.filterwarnings("ignore", category=FutureWarning)
def daily_cross_sectional_correlation(
sig_a: pd.DataFrame, sig_b: pd.DataFrame
) -> pd.Series:
"""Daily cross-sectional Spearman correlation between two factor signals."""
common_idx = sig_a.index.intersection(sig_b.index)
common_cols = sig_a.columns.intersection(sig_b.columns)
a = sig_a.loc[common_idx, common_cols]
b = sig_b.loc[common_idx, common_cols]
corrs = {}
for date in common_idx:
va = a.loc[date].dropna()
vb = b.loc[date].dropna()
common = va.index.intersection(vb.index)
if len(common) < 30:
continue
c = va[common].corr(vb[common], method="spearman")
if np.isfinite(c):
corrs[date] = c
return pd.Series(corrs)
def proper_factor_correlation_matrix(factors: dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Compute average daily cross-sectional Spearman correlations."""
names = list(factors.keys())
n = len(names)
matrix = pd.DataFrame(1.0, index=names, columns=names)
for i in range(n):
for j in range(i + 1, n):
corr_series = daily_cross_sectional_correlation(factors[names[i]], factors[names[j]])
avg_corr = corr_series.mean() if len(corr_series) > 0 else np.nan
matrix.loc[names[i], names[j]] = avg_corr
matrix.loc[names[j], names[i]] = avg_corr
return matrix
def residual_signal(
target: pd.DataFrame,
controls: list[pd.DataFrame],
) -> pd.DataFrame:
"""Cross-sectionally orthogonalize target signal against control signals.
For each day, regress target ranks on control ranks, return residual."""
ranked_target = target.rank(axis=1, pct=True, na_option="keep")
ranked_controls = [c.rank(axis=1, pct=True, na_option="keep") for c in controls]
residuals = pd.DataFrame(index=target.index, columns=target.columns, dtype=float)
for date in target.index:
y = ranked_target.loc[date].dropna()
xs = [rc.loc[date].reindex(y.index) for rc in ranked_controls if date in rc.index]
if not xs:
residuals.loc[date] = y
continue
x_df = pd.concat(xs, axis=1).dropna()
common = y.index.intersection(x_df.index)
if len(common) < 30:
continue
y_c = y[common].values
x_c = x_df.loc[common].values
x_c = np.column_stack([np.ones(len(common)), x_c])
try:
coef, _, _, _ = np.linalg.lstsq(x_c, y_c, rcond=None)
resid = y_c - x_c @ coef
residuals.loc[date, common] = resid
except np.linalg.LinAlgError:
residuals.loc[date, common] = y[common].values
return residuals
def subperiod_ic(signal: pd.DataFrame, prices: pd.DataFrame, horizon: int = 5, window_years: int = 2):
"""Compute IC for each rolling sub-period."""
fwd_ret = prices.pct_change(horizon).shift(-horizon)
ic_series = compute_ic(signal, fwd_ret)
if len(ic_series) == 0:
return pd.DataFrame()
window = 252 * window_years
results = []
start = ic_series.index[0]
while start < ic_series.index[-1]:
end = start + pd.DateOffset(years=window_years)
subset = ic_series[(ic_series.index >= start) & (ic_series.index < end)]
if len(subset) > 100:
results.append({
"period": f"{start.year}-{end.year}",
"ic_mean": subset.mean(),
"ic_std": subset.std(),
"icir": subset.mean() / subset.std() if subset.std() > 0 else 0,
"pct_positive": (subset > 0).mean(),
"n_days": len(subset),
})
start = end
return pd.DataFrame(results)
def test_factor_combination(
factors: dict[str, pd.DataFrame],
factor_names: list[str],
weights: list[float],
prices: pd.DataFrame,
label: str,
):
"""Test a weighted combination of factors."""
ranked = [factors[n].rank(axis=1, pct=True, na_option="keep") for n in factor_names]
combo = sum(w * r for w, r in zip(weights, ranked))
fwd_5d = prices.pct_change(5).shift(-5)
ic_series = compute_ic(combo, fwd_5d)
if len(ic_series) == 0:
return None
return {
"combo": label,
"ic_5d": ic_series.mean(),
"icir_5d": ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0,
"ic_stab": (ic_series.rolling(252).mean().dropna() > 0).mean() if len(ic_series) > 252 else np.nan,
}
def run_analysis(market: str):
config = UNIVERSES[market]
benchmark = config["benchmark"]
print(f"Loading {market.upper()} price data...")
prices = data_manager.load(market)
stocks = prices.drop(columns=[benchmark], errors="ignore")
print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} days")
# Build factors
print("Computing factors...")
factors = {}
factors["momentum_12_1"] = factor_momentum_12_1(stocks)
factors["recovery"] = factor_recovery(stocks)
factors["inverse_vol"] = factor_inverse_vol(stocks)
factors["short_term_reversal"] = factor_short_term_reversal(stocks)
factors["drawdown_recovery"] = factor_max_drawdown_recovery(stocks)
factors["mean_rev_zscore"] = factor_mean_reversion_residual(stocks)
factors["neg_skewness"] = factor_skewness(stocks)
factors["near_52w_high"] = factor_near_52w_high(stocks)
factors["low_downside_beta"] = factor_low_downside_beta(stocks)
factors["smooth_momentum"] = factor_smooth_momentum(stocks)
factors["recovery_accel"] = factor_recovery_acceleration(stocks)
factors["range_compression"] = factor_range_compression(stocks)
if market == "cn":
factors["anti_lottery"] = factor_lottery_demand(stocks)
factors["vol_reversal"] = factor_turnover_reversal(stocks)
factors["low_price"] = factor_price_level(stocks)
factors["illiquidity"] = factor_liquidity_premium(stocks)
# ---- 1. Proper Cross-Sectional Correlation Matrix ----
print("\n" + "=" * 90)
print(f" 1. CROSS-SECTIONAL FACTOR CORRELATIONS — {market.upper()}")
print("=" * 90)
print("(Average daily Spearman correlation between factor ranks)\n")
corr = proper_factor_correlation_matrix(factors)
print(corr.round(3).to_string())
# ---- 2. Residual IC after neutralizing known factors ----
print("\n" + "=" * 90)
print(f" 2. RESIDUAL IC AFTER NEUTRALIZING KNOWN FACTORS — {market.upper()}")
print("=" * 90)
print("(IC of factor after cross-sectionally regressing out momentum + recovery)\n")
known = [factors["momentum_12_1"], factors["recovery"]]
fwd_5d = stocks.pct_change(5).shift(-5)
new_candidates = [k for k in factors if k not in ("momentum_12_1", "recovery", "inverse_vol")]
rows = []
for name in new_candidates:
resid = residual_signal(factors[name], known)
ic_series = compute_ic(resid, fwd_5d)
if len(ic_series) > 0:
rows.append({
"factor": name,
"raw_ic_5d": compute_ic(factors[name], fwd_5d).mean(),
"residual_ic_5d": ic_series.mean(),
"residual_icir_5d": ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0,
"pct_pos": (ic_series > 0).mean(),
})
resid_df = pd.DataFrame(rows).set_index("factor").sort_values("residual_icir_5d", ascending=False)
print(resid_df.round(4).to_string())
# ---- 3. Sub-Period Stability ----
print("\n" + "=" * 90)
print(f" 3. SUB-PERIOD IC STABILITY (2-year windows, 5-day horizon) — {market.upper()}")
print("=" * 90)
# Test top factors
if market == "us":
top_factors = ["low_downside_beta", "drawdown_recovery", "mean_rev_zscore", "short_term_reversal", "momentum_12_1"]
else:
top_factors = ["momentum_12_1", "anti_lottery", "inverse_vol", "vol_reversal", "near_52w_high"]
for name in top_factors:
if name not in factors:
continue
print(f"\n {name}:")
sp = subperiod_ic(factors[name], stocks, horizon=5)
if not sp.empty:
print(sp.to_string(index=False))
else:
print(" (insufficient data)")
# ---- 4. Factor Combinations ----
print("\n" + "=" * 90)
print(f" 4. FACTOR COMBINATIONS — {market.upper()}")
print("=" * 90)
print("(Testing multi-factor composites)\n")
combos = []
if market == "us":
tests = [
(["momentum_12_1", "low_downside_beta"], [0.5, 0.5], "mom+low_dbeta"),
(["momentum_12_1", "drawdown_recovery"], [0.5, 0.5], "mom+dd_recovery"),
(["momentum_12_1", "mean_rev_zscore"], [0.5, 0.5], "mom+mean_rev"),
(["momentum_12_1", "short_term_reversal"], [0.5, 0.5], "mom+STR"),
(["recovery", "low_downside_beta"], [0.5, 0.5], "recovery+low_dbeta"),
(["momentum_12_1", "recovery", "low_downside_beta"], [0.33, 0.33, 0.34], "mom+rec+low_dbeta"),
(["momentum_12_1", "recovery", "drawdown_recovery"], [0.33, 0.33, 0.34], "mom+rec+dd_rec"),
(["momentum_12_1", "recovery", "short_term_reversal"], [0.33, 0.33, 0.34], "mom+rec+STR"),
(["momentum_12_1", "recovery", "mean_rev_zscore"], [0.33, 0.33, 0.34], "mom+rec+meanrev"),
(["momentum_12_1", "recovery", "low_downside_beta", "short_term_reversal"],
[0.25, 0.25, 0.25, 0.25], "mom+rec+dbeta+STR"),
(["momentum_12_1", "recovery", "drawdown_recovery", "mean_rev_zscore"],
[0.25, 0.25, 0.25, 0.25], "mom+rec+ddrec+meanrev"),
]
else: # cn
tests = [
(["momentum_12_1", "anti_lottery"], [0.5, 0.5], "mom+anti_lottery"),
(["momentum_12_1", "inverse_vol"], [0.5, 0.5], "mom+inv_vol"),
(["momentum_12_1", "vol_reversal"], [0.5, 0.5], "mom+vol_reversal"),
(["momentum_12_1", "near_52w_high"], [0.5, 0.5], "mom+near52wh"),
(["momentum_12_1", "anti_lottery", "inverse_vol"], [0.33, 0.33, 0.34], "mom+alot+invvol"),
(["momentum_12_1", "anti_lottery", "vol_reversal"], [0.33, 0.33, 0.34], "mom+alot+volrev"),
(["momentum_12_1", "anti_lottery", "near_52w_high"], [0.33, 0.33, 0.34], "mom+alot+near52w"),
(["momentum_12_1", "recovery", "anti_lottery"], [0.33, 0.33, 0.34], "mom+rec+alot"),
(["momentum_12_1", "anti_lottery", "inverse_vol", "vol_reversal"],
[0.25, 0.25, 0.25, 0.25], "mom+alot+invvol+volrev"),
(["momentum_12_1", "anti_lottery", "near_52w_high", "vol_reversal"],
[0.25, 0.25, 0.25, 0.25], "mom+alot+52wh+volrev"),
]
# Also test the existing recovery+momentum baseline
baseline = test_factor_combination(factors, ["momentum_12_1", "recovery"], [0.5, 0.5], stocks, "BASELINE: mom+recovery")
if baseline:
combos.append(baseline)
for names, weights, label in tests:
if all(n in factors for n in names):
result = test_factor_combination(factors, names, weights, stocks, label)
if result:
combos.append(result)
combo_df = pd.DataFrame(combos).set_index("combo").sort_values("icir_5d", ascending=False)
print(combo_df.round(4).to_string())
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--market", default="us", choices=["us", "cn"])
args = parser.parse_args()
run_analysis(args.market)
if __name__ == "__main__":
main()

150
factor_final_check.py Normal file
View File

@@ -0,0 +1,150 @@
"""Final robustness check on champions from the discovery loop."""
from __future__ import annotations
import warnings
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
from factor_loop import (
strat, bt, stats, combo, yearly,
f_rec_mom, f_rec_126, f_rec_63,
f_mom_12_1, f_mom_6_1, f_mom_intermediate,
f_above_ma200, f_golden_cross,
f_up_volume_proxy, f_gap_up_freq,
f_rec_mom_filtered, f_down_resilience,
f_up_capture, f_52w_high, f_str_10d,
f_earnings_drift, f_reversal_vol,
)
warnings.filterwarnings("ignore")
def f_quality_mom(p):
mom = f_mom_12_1(p)
consist_ret = p.pct_change()
consist = (consist_ret > 0).astype(float).rolling(252, min_periods=126).mean()
mom_r = mom.rank(axis=1, pct=True, na_option="keep")
con_r = consist.rank(axis=1, pct=True, na_option="keep")
up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r
def f_mom_x_gap(p):
mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
gap_r = f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep")
return mom_r * gap_r
def rolling_2yr(eq):
dr = eq.pct_change().dropna()
results = []
for end_i in range(504, len(dr), 63):
chunk = dr.iloc[end_i - 504:end_i]
tot = (1 + chunk).prod() - 1
ann = (1 + tot) ** (252 / len(chunk)) - 1
sh = chunk.mean() / chunk.std() * np.sqrt(252) if chunk.std() > 0 else 0
results.append({"end": chunk.index[-1].date(), "ann": ann, "sh": sh})
return pd.DataFrame(results)
def run_robustness(name, func, prices, label_prefix):
print(f"\n {name}:")
# Top-N sensitivity
print(f" Top-N: ", end="")
for n in [5, 10, 15, 20]:
w = strat(prices, func, top_n=n)
eq = bt(w, prices)
s = stats(eq)
print(f"N={n}: {s['cagr']:+.1%}/{s['sharpe']:.2f} ", end="")
print()
# Rebal sensitivity
print(f" Rebal: ", end="")
for r in [5, 10, 21, 42]:
w = strat(prices, func, top_n=10, rebal=r)
eq = bt(w, prices)
s = stats(eq)
print(f"{r}d: {s['cagr']:+.1%}/{s['sharpe']:.2f} ", end="")
print()
# Cost sensitivity
print(f" Cost: ", end="")
for c in [0, 0.001, 0.002, 0.005]:
w = strat(prices, func, top_n=10)
eq = bt(w, prices, cost=c)
s = stats(eq)
print(f"{c*1e4:.0f}bp: {s['cagr']:+.1%} ", end="")
print()
# Rolling 2-year
w = strat(prices, func, top_n=10)
eq = bt(w, prices)
roll = rolling_2yr(eq)
if not roll.empty:
pct_pos = (roll["ann"] > 0).mean()
print(f" 2yr rolling: mean={roll['ann'].mean():+.1%} min={roll['ann'].min():+.1%} "
f"max={roll['ann'].max():+.1%} %pos={pct_pos:.0%} mean_sharpe={roll['sh'].mean():.2f}")
def main():
# ============= US =============
prices_us = data_manager.load("us")
stocks_us = prices_us.drop(columns=["SPY"], errors="ignore")
print("=" * 95)
print(" US FINAL ROBUSTNESS — Champions vs Baseline")
print("=" * 95)
us_champs = [
("BASELINE: rec+mom", f_rec_mom),
("rec_mom_filtered+rec_deep×upvol",
combo([(f_rec_mom_filtered, 0.5),
combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), (lambda x: x, 0.0)])), # hack
("above_ma200+mom_7m+rec_126d",
combo([(f_above_ma200, 0.33), (f_mom_intermediate, 0.33), (f_rec_126, 0.34)])),
("rec_mom_filtered+above_ma200",
combo([(f_rec_mom_filtered, 0.5), (f_above_ma200, 0.5)])),
("mom_7m+rec_126d",
combo([(f_mom_intermediate, 0.5), (f_rec_126, 0.5)])),
]
# Fix the first champion - need proper 2-factor combo
us_champs[1] = (
"rec_mom_filt + rec_deep×upvol",
combo([
(f_rec_mom_filtered, 0.5),
(combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), 0.5),
])
)
for name, func in us_champs:
run_robustness(name, func, stocks_us, "US")
# ============= CN =============
prices_cn = data_manager.load("cn")
stocks_cn = prices_cn.drop(columns=["000300.SS"], errors="ignore")
print(f"\n{'='*95}")
print(" CN FINAL ROBUSTNESS — Champions vs Baseline")
print("=" * 95)
cn_champs = [
("BASELINE: rec+mom", f_rec_mom),
("up_capture+quality_mom",
combo([(f_up_capture, 0.5), (f_quality_mom, 0.5)])),
("recovery_63d+mom×gap",
combo([(f_rec_63, 0.5), (f_mom_x_gap, 0.5)])),
("down_resilience+quality_mom",
combo([(f_down_resilience, 0.5), (f_quality_mom, 0.5)])),
("up_capture+mom×gap",
combo([(f_up_capture, 0.5), (f_mom_x_gap, 0.5)])),
]
for name, func in cn_champs:
run_robustness(name, func, stocks_cn, "CN")
if __name__ == "__main__":
main()

654
factor_loop.py Normal file
View File

@@ -0,0 +1,654 @@
"""
Iterative Factor Discovery Loop.
Round 1: Academic & practitioner hypotheses (30+ factors)
Round 2: Data-driven variations on Round 1 winners
Round 3: Interaction and conditional factors
Round 4: Parameter optimization on finalists
Round 5: Best combinations
Each factor is tested immediately as a top-10 equal-weight strategy
with monthly rebalancing and 10bps transaction costs.
"""
from __future__ import annotations
import argparse
import warnings
from typing import Callable
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
warnings.filterwarnings("ignore")
FactorFunc = Callable[[pd.DataFrame], pd.DataFrame]
# ---------------------------------------------------------------------------
# Backtest infrastructure
# ---------------------------------------------------------------------------
def strat(
prices: pd.DataFrame,
signal_func: FactorFunc,
top_n: int = 10,
rebal: int = 21,
warmup: int = 252,
) -> pd.DataFrame:
sig = signal_func(prices)
rank = sig.rank(axis=1, ascending=False, na_option="bottom")
n_valid = sig.notna().sum(axis=1)
enough = n_valid >= top_n
mask = (rank <= top_n) & enough.values.reshape(-1, 1)
raw = mask.astype(float)
w = raw.div(raw.sum(axis=1).replace(0, np.nan), axis=0).fillna(0.0)
rmask = pd.Series(False, index=prices.index)
rmask.iloc[list(range(warmup, len(prices), rebal))] = True
w[~rmask] = np.nan
w = w.ffill().fillna(0.0)
w.iloc[:warmup] = 0.0
return w.shift(1).fillna(0.0)
def bt(weights: pd.DataFrame, prices: pd.DataFrame, cost: float = 0.001) -> pd.Series:
ret = prices.pct_change().fillna(0.0)
pr = (weights * ret).sum(axis=1)
pr -= weights.diff().abs().sum(axis=1) * cost
return (1 + pr).cumprod() * 100000
def stats(eq: pd.Series) -> dict:
dr = eq.pct_change().dropna()
if len(dr) < 200 or dr.std() == 0:
return {"cagr": np.nan, "sharpe": np.nan, "sortino": np.nan,
"maxdd": np.nan, "calmar": np.nan}
ny = len(dr) / 252
tot = eq.iloc[-1] / eq.iloc[0] - 1
cagr = (1 + tot) ** (1 / ny) - 1
sh = dr.mean() / dr.std() * np.sqrt(252)
sd = dr[dr < 0].std()
so = dr.mean() / sd * np.sqrt(252) if sd > 0 else 0
rm = eq.cummax()
dd = ((eq - rm) / rm).min()
cal = cagr / abs(dd) if dd != 0 else 0
return {"cagr": cagr, "sharpe": sh, "sortino": so, "maxdd": dd, "calmar": cal}
def yearly(eq: pd.Series) -> dict[int, float]:
dr = eq.pct_change().fillna(0)
return {y: float((1 + dr[dr.index.year == y]).prod() - 1) for y in sorted(dr.index.year.unique())}
def test_factor(name: str, func: FactorFunc, prices: pd.DataFrame,
top_n: int = 10) -> dict:
w = strat(prices, func, top_n=top_n)
eq = bt(w, prices)
s = stats(eq)
s["name"] = name
s["equity"] = eq
return s
def combo(fws: list[tuple[FactorFunc, float]]) -> FactorFunc:
def _c(p):
return sum(w * f(p).rank(axis=1, pct=True, na_option="keep") for f, w in fws)
return _c
def print_results(results: list[dict], title: str):
df = pd.DataFrame([{k: v for k, v in r.items() if k != "equity"} for r in results])
df = df.set_index("name").sort_values("cagr", ascending=False)
print(f"\n{'='*95}")
print(f" {title}")
print(f"{'='*95}")
print(f" {'Factor':<45} {'CAGR':>7} {'Sharpe':>7} {'Sortino':>8} {'MaxDD':>7} {'Calmar':>7}")
print(f" {'-'*85}")
for name, row in df.iterrows():
flag = " <<<" if "BASELINE" in str(name) else ""
c = row['cagr']
if np.isnan(c):
continue
print(f" {str(name):<45} {c:>+6.1%} {row['sharpe']:>7.2f} {row['sortino']:>8.2f} "
f"{row['maxdd']:>+6.1%} {row['calmar']:>7.2f}{flag}")
return df
# =====================================================================
# ROUND 1 — Academic & Practitioner Hypotheses
# =====================================================================
# --- Momentum family ---
def f_mom_12_1(p): return p.shift(21).pct_change(231)
def f_mom_6_1(p): return p.shift(21).pct_change(105)
def f_mom_3_1(p): return p.shift(21).pct_change(42)
def f_mom_1_0(p): return p.pct_change(21) # 1-month (reversal in US)
# --- Recovery family ---
def f_rec_63(p): return p / p.rolling(63, min_periods=63).min() - 1
def f_rec_126(p): return p / p.rolling(126, min_periods=126).min() - 1
def f_rec_21(p): return p / p.rolling(21, min_periods=21).min() - 1
# Novy-Marx 2012: intermediate momentum (7-12 month)
def f_mom_intermediate(p): return p.shift(21).pct_change(147) # ~7 month
# Asness et al: quality/profitability proxy via return consistency
def f_consistent_returns(p):
ret = p.pct_change()
return (ret > 0).astype(float).rolling(252, min_periods=126).mean()
# Da, Liu, Schaumburg 2014: information discreteness
# Stocks with many small positive days > stocks with few large positive days
def f_info_discrete(p):
ret = p.pct_change()
n_pos = (ret > 0).astype(float).rolling(60, min_periods=40).sum()
sum_pos = ret.where(ret > 0, 0).rolling(60, min_periods=40).sum()
avg_pos = sum_pos / n_pos.replace(0, np.nan)
# High count of positive days + low average positive = smooth accumulation
return n_pos / avg_pos.replace(0, np.nan)
# Accumulation proxy (worked in Round 1)
def f_up_volume_proxy(p):
ret = p.pct_change()
return ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
# George & Hwang 2004: 52-week high ratio
def f_52w_high(p):
return p / p.rolling(252, min_periods=126).max()
# Frequency of large up-moves (worked in Round 1)
def f_gap_up_freq(p):
ret = p.pct_change()
return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean()
# Bali, Cakici, Whitelaw 2011: MAX effect (lottery demand)
def f_anti_max(p):
ret = p.pct_change()
return -ret.rolling(20, min_periods=15).max()
# Ang et al 2006: idiosyncratic volatility (negative)
def f_neg_ivol(p):
ret = p.pct_change()
return -ret.rolling(20, min_periods=15).std()
# Blitz & van Vliet 2007: low volatility anomaly
def f_low_vol_60(p):
ret = p.pct_change()
return -ret.rolling(60, min_periods=40).std()
# Hurst exponent proxy — autocorrelation of returns
# Stocks with positive autocorrelation = trending
def f_autocorrelation(p):
ret = p.pct_change()
def _ac(x):
x = x.dropna()
if len(x) < 20:
return np.nan
return np.corrcoef(x[:-1], x[1:])[0, 1]
return ret.rolling(60, min_periods=40).apply(_ac, raw=False)
# Short-term reversal (Jegadeesh 1990)
def f_str_5d(p): return -p.pct_change(5)
def f_str_10d(p): return -p.pct_change(10)
# Earnings drift proxy (worked in Round 1)
def f_earnings_drift(p):
ret_5d = p.pct_change(5)
vol = p.pct_change().rolling(60, min_periods=40).std() * np.sqrt(5)
z = ret_5d / vol.replace(0, np.nan)
return z.rolling(60, min_periods=20).mean()
# Risk-adjusted momentum (Sharpe-momentum)
def f_sharpe_mom(p):
ret = p.pct_change()
mu = ret.rolling(252, min_periods=126).mean()
sigma = ret.rolling(252, min_periods=126).std()
return mu / sigma.replace(0, np.nan)
# Trend strength: slope of log-price regression
def f_trend_slope(p):
log_p = np.log(p.replace(0, np.nan))
def _slope(x):
x = x.dropna().values
if len(x) < 30:
return np.nan
t = np.arange(len(x), dtype=float)
t -= t.mean()
return (t * (x - x.mean())).sum() / (t * t).sum()
return log_p.rolling(60, min_periods=30).apply(_slope, raw=False)
# Acceleration: recent momentum vs. longer-term momentum
def f_mom_accel(p):
m3 = p.shift(5).pct_change(58) # ~3mo
m12 = p.shift(21).pct_change(231) # ~12mo
return m3 - m12
# Mean reversion z-score
def f_mean_rev_z(p):
ma20 = p.rolling(20, min_periods=20).mean()
vol = p.pct_change().rolling(60, min_periods=40).std() * p
return -(p - ma20) / vol.replace(0, np.nan)
# Price relative to moving averages
def f_above_ma200(p):
return p / p.rolling(200, min_periods=200).mean() - 1
def f_above_ma50(p):
return p / p.rolling(50, min_periods=50).mean() - 1
# Dual MA signal: 50-day MA / 200-day MA
def f_golden_cross(p):
ma50 = p.rolling(50, min_periods=50).mean()
ma200 = p.rolling(200, min_periods=200).mean()
return ma50 / ma200 - 1
# Drawdown recovery rate
def f_dd_recovery_rate(p):
rm = p.rolling(252, min_periods=126).max()
dd = p / rm - 1 # negative when in drawdown
return dd - dd.shift(20) # positive = recovering from drawdown
# A-share specific: short-term reversal x volatility
def f_reversal_vol(p):
return -p.pct_change(5) * p.pct_change().rolling(20, min_periods=15).std()
# Recovery + momentum (baseline)
def f_rec_mom(p):
r1 = f_rec_63(p).rank(axis=1, pct=True, na_option="keep")
r2 = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
return 0.5 * r1 + 0.5 * r2
# =====================================================================
# ROUND 2 — Second-order ideas from Round 1 analysis
# =====================================================================
# The key insight: "quality of returns" matters more than "magnitude of returns"
# Factors that measure HOW a stock goes up, not just that it went up.
# Smoothness-weighted momentum
def f_smooth_momentum(p):
"""Momentum penalized by path volatility. Stocks that go up smoothly."""
mom = p.shift(21).pct_change(231)
ret = p.pct_change()
vol = ret.rolling(252, min_periods=126).std()
return mom / (vol.replace(0, np.nan) ** 0.5) # sqrt to dampen
# Positive return ratio (like Sharpe numerator)
def f_pos_ratio_60(p):
"""Fraction of positive return days in 60 days. Quality signal."""
ret = p.pct_change()
return (ret > 0).astype(float).rolling(60, min_periods=40).mean()
# Cumulative positive returns vs cumulative negative returns
def f_up_down_asymmetry(p):
"""Ratio of cumulative up-move to cumulative down-move."""
ret = p.pct_change()
up = ret.where(ret > 0, 0).rolling(60, min_periods=40).sum()
down = (-ret.where(ret < 0, 0)).rolling(60, min_periods=40).sum()
return up / down.replace(0, np.nan)
# Streak momentum: max consecutive up days in last 40 days
def f_max_streak(p):
ret = p.pct_change()
pos = (ret > 0).astype(float)
def _max_streak(x):
x = x.dropna().values
if len(x) == 0:
return 0
best = cur = 0
for v in x:
cur = cur + 1 if v > 0.5 else 0
best = max(best, cur)
return best
return pos.rolling(40, min_periods=20).apply(_max_streak, raw=False)
# Overnight proxy: gap between yesterday's close and today's pattern
# Since we only have close prices, use close-to-close 1d return decomposition
def f_up_capture(p):
"""Up-market capture ratio over 60 days."""
ret = p.pct_change()
mkt = ret.mean(axis=1)
up_mkt = mkt > 0
arr = ret.values.copy()
arr[~up_mkt.values, :] = np.nan
stock_up = pd.DataFrame(arr, index=ret.index, columns=ret.columns)
mkt_up_vals = mkt.where(up_mkt, np.nan)
stock_avg = stock_up.rolling(60, min_periods=20).mean()
mkt_avg = mkt_up_vals.rolling(60, min_periods=20).mean()
return stock_avg.div(mkt_avg, axis=0)
# Down-market resilience
def f_down_resilience(p):
"""How much LESS a stock falls on down-market days."""
ret = p.pct_change()
mkt = ret.mean(axis=1)
down_mkt = mkt < 0
arr = ret.values.copy()
arr[~down_mkt.values, :] = np.nan
down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns)
return -down_ret.rolling(120, min_periods=30).mean()
# Recovery from rolling max with momentum filter
def f_rec_mom_filtered(p):
"""Recovery factor only for stocks with positive 6-month momentum.
Filters out dead-cat bounces."""
rec = p / p.rolling(126, min_periods=126).min() - 1
mom = p.shift(21).pct_change(105)
return rec.where(mom > 0, np.nan)
# Information discreteness v2: using the sign ratio
def f_sign_ratio(p):
"""Ratio of (count of positive days)^2 * avg_size to total return.
High ratio = many small ups = institutional flow."""
ret = p.pct_change()
n_total = 60
n_pos = (ret > 0).astype(float).rolling(n_total, min_periods=40).sum()
total_ret = ret.rolling(n_total, min_periods=40).sum()
sign_vol = n_pos / n_total
# Stocks where most of the return comes from many small positive days
return sign_vol * total_ret.clip(lower=0)
# =====================================================================
# ROUND 3 — Interaction & conditional factors
# =====================================================================
def f_mom_x_recovery(p):
"""Momentum × Recovery interaction. The product, not the sum."""
mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
rec_r = f_rec_63(p).rank(axis=1, pct=True, na_option="keep")
return mom_r * rec_r
def f_mom_x_upvol(p):
"""Momentum × Up-volume-proxy interaction."""
mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
return mom_r * up_r
def f_rec_deep_x_upvol(p):
"""Deep recovery × Up-volume interaction."""
rec_r = f_rec_126(p).rank(axis=1, pct=True, na_option="keep")
up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
return rec_r * up_r
def f_trend_x_mom(p):
"""Trend strength × Momentum. Trending + momentum = double signal."""
tr_r = f_trend_slope(p).rank(axis=1, pct=True, na_option="keep")
mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
return tr_r * mom_r
def f_quality_mom(p):
"""Momentum filtered by consistency. Only persistent winners."""
mom = f_mom_12_1(p)
consist = f_consistent_returns(p)
mom_r = mom.rank(axis=1, pct=True, na_option="keep")
con_r = consist.rank(axis=1, pct=True, na_option="keep")
return 0.4 * mom_r + 0.3 * con_r + 0.3 * f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
def f_rec_deep_x_gap(p):
"""Deep recovery × gap-up frequency."""
rec_r = f_rec_126(p).rank(axis=1, pct=True, na_option="keep")
gap_r = f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep")
return rec_r * gap_r
def f_mom_x_gap(p):
"""Momentum × gap-up frequency."""
mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
gap_r = f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep")
return mom_r * gap_r
# Regime-conditional: momentum with volatility filter
def f_mom_low_vol_regime(p):
"""Momentum only when market vol is below median.
Momentum crashes in high-vol regimes."""
mom = f_mom_12_1(p)
mkt_vol = p.pct_change().mean(axis=1).rolling(60).std()
vol_median = mkt_vol.rolling(252, min_periods=126).median()
low_vol = mkt_vol <= vol_median
mask = pd.DataFrame(
np.tile(low_vol.values[:, None], (1, mom.shape[1])),
index=mom.index, columns=mom.columns,
)
return mom.where(mask, 0)
# =====================================================================
# Main loop
# =====================================================================
def run_round(
name: str,
factors: list[tuple[str, FactorFunc]],
prices: pd.DataFrame,
top_n: int = 10,
) -> list[dict]:
results = []
for fname, func in factors:
r = test_factor(fname, func, prices, top_n=top_n)
results.append(r)
print_results(results, name)
return results
def run_market(market: str):
config = UNIVERSES[market]
benchmark = config["benchmark"]
prices = data_manager.load(market)
bench = prices[benchmark].dropna() if benchmark in prices.columns else None
stocks = prices.drop(columns=[benchmark], errors="ignore")
print(f"\n{'#'*95}")
print(f" FACTOR DISCOVERY LOOP — {market.upper()} MARKET")
print(f" {stocks.shape[1]} stocks, {stocks.shape[0]} days, "
f"{stocks.index[0].date()}{stocks.index[-1].date()}")
print(f"{'#'*95}")
# Benchmark
if bench is not None:
eq_bench = bench / bench.iloc[0] * 100000
bs = stats(eq_bench)
print(f"\n Benchmark: CAGR {bs['cagr']:+.1%}, Sharpe {bs['sharpe']:.2f}")
# ================================================================
# ROUND 1: Academic & practitioner factors
# ================================================================
r1_factors = [
("BASELINE:rec+mom", f_rec_mom),
# Momentum family
("mom_12_1", f_mom_12_1),
("mom_6_1", f_mom_6_1),
("mom_3_1", f_mom_3_1),
("mom_1_0", f_mom_1_0),
("mom_intermediate_7m", f_mom_intermediate),
("sharpe_momentum", f_sharpe_mom),
# Recovery family
("recovery_63d", f_rec_63),
("recovery_126d", f_rec_126),
("recovery_21d", f_rec_21),
# Trend
("trend_slope_60d", f_trend_slope),
("golden_cross", f_golden_cross),
("above_ma200", f_above_ma200),
# Volatility
("low_vol_60d", f_low_vol_60),
("neg_ivol_20d", f_neg_ivol),
# Reversal
("STR_5d", f_str_5d),
("STR_10d", f_str_10d),
# Quality / accumulation
("consistent_returns", f_consistent_returns),
("up_volume_proxy", f_up_volume_proxy),
("gap_up_freq", f_gap_up_freq),
("info_discrete", f_info_discrete),
("earnings_drift", f_earnings_drift),
# Other academic
("52w_high", f_52w_high),
("anti_max_20d", f_anti_max),
("dd_recovery_rate", f_dd_recovery_rate),
("mom_acceleration", f_mom_accel),
]
if market == "cn":
r1_factors.append(("reversal_vol_cn", f_reversal_vol))
r1 = run_round("ROUND 1 — Academic & Practitioner Hypotheses", r1_factors, stocks)
# Identify top-10 from round 1
r1_sorted = sorted(r1, key=lambda x: x.get("cagr", 0) or 0, reverse=True)
r1_top_names = [r["name"] for r in r1_sorted[:10] if r.get("cagr") and r["cagr"] > 0]
baseline_cagr = next((r["cagr"] for r in r1 if "BASELINE" in r["name"]), 0)
print(f"\n Baseline CAGR: {baseline_cagr:+.1%}")
print(f" Top 10: {r1_top_names}")
# ================================================================
# ROUND 2: Second-order ideas based on what worked
# ================================================================
r2_factors = [
("BASELINE:rec+mom", f_rec_mom),
("smooth_momentum", f_smooth_momentum),
("pos_ratio_60d", f_pos_ratio_60),
("up_down_asymmetry", f_up_down_asymmetry),
("max_streak_40d", f_max_streak),
("up_capture_60d", f_up_capture),
("down_resilience_120d", f_down_resilience),
("rec_mom_filtered", f_rec_mom_filtered),
("sign_ratio", f_sign_ratio),
("autocorrelation_60d", f_autocorrelation),
("mean_rev_z", f_mean_rev_z),
]
r2 = run_round("ROUND 2 — Return Quality & Behavioral Factors", r2_factors, stocks)
# ================================================================
# ROUND 3: Interaction & conditional factors
# ================================================================
r3_factors = [
("BASELINE:rec+mom", f_rec_mom),
("mom×recovery", f_mom_x_recovery),
("mom×upvol", f_mom_x_upvol),
("rec_deep×upvol", f_rec_deep_x_upvol),
("trend×mom", f_trend_x_mom),
("quality_mom", f_quality_mom),
("rec_deep×gap", f_rec_deep_x_gap),
("mom×gap", f_mom_x_gap),
("mom_low_vol_regime", f_mom_low_vol_regime),
]
r3 = run_round("ROUND 3 — Interaction & Conditional Factors", r3_factors, stocks)
# ================================================================
# Collect ALL results from all rounds
# ================================================================
all_results = r1 + r2 + r3
# Deduplicate baseline
seen = set()
unique = []
for r in all_results:
if r["name"] not in seen:
seen.add(r["name"])
unique.append(r)
unique_sorted = sorted(unique, key=lambda x: x.get("cagr", 0) or 0, reverse=True)
print(f"\n{'='*95}")
print(f" ALL ROUNDS COMBINED — TOP 15 FACTORS — {market.upper()}")
print(f"{'='*95}")
print(f" {'Factor':<45} {'CAGR':>7} {'Sharpe':>7} {'Sortino':>8} {'MaxDD':>7} {'Calmar':>7}")
print(f" {'-'*85}")
for r in unique_sorted[:15]:
flag = " <<<" if "BASELINE" in r["name"] else ""
print(f" {r['name']:<45} {r['cagr']:>+6.1%} {r['sharpe']:>7.2f} {r['sortino']:>8.2f} "
f"{r['maxdd']:>+6.1%} {r['calmar']:>7.2f}{flag}")
# ================================================================
# ROUND 4: Combine top non-baseline factors
# ================================================================
top_funcs = {}
func_map_all = dict(r1_factors + r2_factors + r3_factors)
non_baseline = [r for r in unique_sorted if "BASELINE" not in r["name"] and r.get("cagr", 0)]
for r in non_baseline[:12]:
if r["name"] in func_map_all:
top_funcs[r["name"]] = func_map_all[r["name"]]
top_names = list(top_funcs.keys())
print(f"\n Building combinations from top {len(top_names)} factors: {top_names}")
combo_factors = [("BASELINE:rec+mom", f_rec_mom)]
# All pairs from top-8
for i in range(min(8, len(top_names))):
for j in range(i + 1, min(8, len(top_names))):
n1, n2 = top_names[i], top_names[j]
combo_factors.append((
f"{n1[:20]}+{n2[:20]}",
combo([(top_funcs[n1], 0.5), (top_funcs[n2], 0.5)])
))
# Triple combos from top-5
for i in range(min(5, len(top_names))):
for j in range(i + 1, min(5, len(top_names))):
for k in range(j + 1, min(5, len(top_names))):
n1, n2, n3 = top_names[i], top_names[j], top_names[k]
combo_factors.append((
f"{n1[:15]}+{n2[:15]}+{n3[:15]}",
combo([(top_funcs[n1], 0.33), (top_funcs[n2], 0.33), (top_funcs[n3], 0.34)])
))
r4 = run_round("ROUND 4 — Factor Combinations", combo_factors, stocks)
# ================================================================
# ROUND 5: Yearly breakdown of top 5 combos
# ================================================================
r4_sorted = sorted(r4, key=lambda x: x.get("cagr", 0) or 0, reverse=True)
top5 = r4_sorted[:5]
# Make sure baseline is included
base = next((r for r in r4 if "BASELINE" in r["name"]), None)
if base and base not in top5:
top5.append(base)
print(f"\n{'='*95}")
print(f" ROUND 5 — YEARLY RETURNS OF BEST STRATEGIES — {market.upper()}")
print(f"{'='*95}")
cols = [(r["name"], r["equity"]) for r in top5]
if bench is not None:
eq_bench = bench / bench.iloc[0] * 100000
cols.append(("BENCHMARK", eq_bench))
# Header
header = f" {'Year':<6}"
for name, _ in cols:
header += f" | {name[:22]:>22}"
print(header)
print(" " + "-" * (6 + 25 * len(cols)))
all_years = sorted(set(y for _, eq in cols for y in eq.index.year.unique()))
for year in all_years:
line = f" {year:<6}"
for _, eq in cols:
dr = eq.pct_change().fillna(0)
yr = dr[dr.index.year == year]
r = float((1 + yr).prod() - 1) if len(yr) > 0 else 0
line += f" | {r:>+21.1%}"
print(line)
# Period CAGRs
for ny in [3, 5, 10]:
cutoff = stocks.index[-1] - pd.DateOffset(years=ny)
print(f"\n --- {ny}-year CAGR ---")
for name, eq in cols:
sl = eq[eq.index >= cutoff]
if len(sl) < 50:
continue
tot = sl.iloc[-1] / sl.iloc[0] - 1
cagr = (1 + tot) ** (1 / ny) - 1
print(f" {name[:50]:<50} {cagr:>+8.1%}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--market", default="us", choices=["us", "cn"])
args = parser.parse_args()
run_market(args.market)
if __name__ == "__main__":
main()

449
factor_real_backtest.py Normal file
View File

@@ -0,0 +1,449 @@
"""
Factor research v2 — Portfolio-first approach.
Instead of IC → portfolio, we go directly to:
1. Build factor signal
2. Select top-N stocks
3. Run real backtest with transaction costs
4. Measure CAGR, Sharpe, MaxDD, yearly returns
Tests single factors AND combinations. Compares everything against
the baseline recovery+momentum strategy.
"""
from __future__ import annotations
import argparse
import warnings
import numpy as np
import pandas as pd
import data_manager
import metrics
from universe import UNIVERSES
warnings.filterwarnings("ignore")
# ---------------------------------------------------------------------------
# Factor signals — each returns DataFrame (dates x stocks), higher = better
# ---------------------------------------------------------------------------
def f_momentum_12_1(p: pd.DataFrame) -> pd.DataFrame:
return p.shift(21).pct_change(231)
def f_recovery(p: pd.DataFrame) -> pd.DataFrame:
return p / p.rolling(63, min_periods=63).min() - 1
def f_recovery_mom(p: pd.DataFrame) -> pd.DataFrame:
"""The baseline composite: 50/50 recovery + momentum ranks."""
r1 = f_recovery(p).rank(axis=1, pct=True, na_option="keep")
r2 = f_momentum_12_1(p).rank(axis=1, pct=True, na_option="keep")
return 0.5 * r1 + 0.5 * r2
# --- New single factors ---
def f_short_term_reversal(p: pd.DataFrame) -> pd.DataFrame:
"""5-day return reversal."""
return -p.pct_change(5)
def f_vol_adjusted_mom(p: pd.DataFrame) -> pd.DataFrame:
"""Momentum divided by recent volatility. Sharpe-like signal.
Hypothesis: risk-adjusted momentum is more persistent."""
mom = p.shift(21).pct_change(231)
vol = p.pct_change().rolling(60, min_periods=40).std()
return mom / vol.replace(0, np.nan)
def f_acceleration(p: pd.DataFrame) -> pd.DataFrame:
"""3-month momentum minus 12-month momentum.
Hypothesis: accelerating stocks continue accelerating."""
mom_3m = p.shift(5).pct_change(63 - 5)
mom_12m = p.shift(21).pct_change(231)
return mom_3m - mom_12m
def f_breakout(p: pd.DataFrame) -> pd.DataFrame:
"""Price relative to 20-day high. Close to 1 = breaking out.
Hypothesis: breakouts from consolidation continue."""
return p / p.rolling(20, min_periods=20).max()
def f_recovery_deep(p: pd.DataFrame) -> pd.DataFrame:
"""Recovery from 126-day (6 month) low instead of 63-day.
Hypothesis: deeper recovery = stronger signal."""
return p / p.rolling(126, min_periods=126).min() - 1
def f_recovery_rate(p: pd.DataFrame) -> pd.DataFrame:
"""Speed of recovery: 20-day change in recovery factor.
Hypothesis: accelerating recovery predicts continuation."""
recovery = p / p.rolling(63, min_periods=63).min() - 1
return recovery - recovery.shift(20)
def f_drawdown_bounce(p: pd.DataFrame) -> pd.DataFrame:
"""20-day return from drawdown trough, only for stocks in drawdown.
Hypothesis: strong bounces from drawdowns persist."""
rolling_max = p.rolling(252, min_periods=126).max()
in_drawdown = p < rolling_max * 0.9 # at least 10% below peak
bounce_20d = p.pct_change(20)
# Only score stocks that were recently in drawdown
was_in_drawdown = in_drawdown.rolling(20, min_periods=1).max().astype(bool)
return bounce_20d.where(was_in_drawdown, np.nan)
def f_consistent_winner(p: pd.DataFrame) -> pd.DataFrame:
"""Fraction of months with positive returns over past 12 months.
Hypothesis: stocks that win consistently are higher quality momentum."""
monthly_ret = p.pct_change(21)
return (monthly_ret > 0).astype(float).rolling(252, min_periods=126).mean()
def f_gap_up_freq(p: pd.DataFrame) -> pd.DataFrame:
"""Fraction of days with >1% gain in past 60 days.
Hypothesis: frequent large gains = institutional buying."""
ret = p.pct_change()
return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean()
def f_low_vol_mom(p: pd.DataFrame) -> pd.DataFrame:
"""Momentum only among low-volatility stocks. Combined rank.
Hypothesis: low-vol momentum is more persistent."""
mom = f_momentum_12_1(p).rank(axis=1, pct=True, na_option="keep")
vol = (-p.pct_change().rolling(60, min_periods=40).std()).rank(axis=1, pct=True, na_option="keep")
return 0.5 * mom + 0.5 * vol
def f_52w_channel_position(p: pd.DataFrame) -> pd.DataFrame:
"""Position within 252-day high-low channel. 1 = at high, 0 = at low.
Hypothesis: stocks near highs continue (anchoring + trend)."""
h = p.rolling(252, min_periods=126).max()
l = p.rolling(252, min_periods=126).min()
return (p - l) / (h - l).replace(0, np.nan)
def f_up_volume_proxy(p: pd.DataFrame) -> pd.DataFrame:
"""Proxy for accumulation: sum of returns on up days over 20 days.
Without volume data, use magnitude of positive returns as proxy."""
ret = p.pct_change()
up_ret = ret.where(ret > 0, 0)
return up_ret.rolling(20, min_periods=15).sum()
def f_relative_strength_ma(p: pd.DataFrame) -> pd.DataFrame:
"""Price above 50-day MA relative to 200-day MA position.
Dual MA trend strength."""
ma50 = p.rolling(50, min_periods=50).mean()
ma200 = p.rolling(200, min_periods=200).mean()
above_50 = (p / ma50 - 1)
above_200 = (p / ma200 - 1)
return 0.5 * above_50 + 0.5 * above_200
def f_earnings_drift_proxy(p: pd.DataFrame) -> pd.DataFrame:
"""Proxy for post-earnings drift using 5-day return spikes.
Identify large 5-day moves and bet on continuation.
Hypothesis: large moves driven by information continue."""
ret_5d = p.pct_change(5)
vol = p.pct_change().rolling(60, min_periods=40).std() * np.sqrt(5)
z_score = ret_5d / vol.replace(0, np.nan)
# Smooth: average z-score over past 60 days to capture multiple events
return z_score.rolling(60, min_periods=20).mean()
# --- A-share specific ---
def f_reversal_vol_cn(p: pd.DataFrame) -> pd.DataFrame:
"""Short-term reversal amplified by volatility.
High-vol oversold stocks bounce harder in A-shares."""
ret_5d = p.pct_change(5)
vol = p.pct_change().rolling(20, min_periods=15).std()
# Oversold (negative return) + high vol = positive score
return -ret_5d * vol
def f_momentum_6_1(p: pd.DataFrame) -> pd.DataFrame:
"""6-1 month momentum. Shorter lookback may work better in A-shares."""
return p.shift(21).pct_change(105)
def f_recovery_narrow(p: pd.DataFrame) -> pd.DataFrame:
"""Recovery from 21-day low. Faster recovery signal for A-shares."""
return p / p.rolling(21, min_periods=21).min() - 1
def f_range_breakout_cn(p: pd.DataFrame) -> pd.DataFrame:
"""Breakout from 60-day range. Tuned for A-share volatility."""
h60 = p.rolling(60, min_periods=40).max()
l60 = p.rolling(60, min_periods=40).min()
mid = (h60 + l60) / 2
rng = (h60 - l60) / mid.replace(0, np.nan)
position = (p - l60) / (h60 - l60).replace(0, np.nan)
# Reward stocks breaking out of narrow ranges
return position / rng.replace(0, np.nan)
# ---------------------------------------------------------------------------
# Strategy builder and backtester
# ---------------------------------------------------------------------------
def make_strategy(
prices: pd.DataFrame,
signal_func,
top_n: int = 10,
rebal_freq: int = 21,
warmup: int = 252,
) -> pd.DataFrame:
"""Turn a factor signal into a rebalanced top-N equal-weight strategy."""
signal = signal_func(prices)
rank = signal.rank(axis=1, ascending=False, na_option="bottom")
n_valid = signal.notna().sum(axis=1)
enough = n_valid >= top_n
top_mask = (rank <= top_n) & enough.values.reshape(-1, 1)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
weights = raw.div(row_sums, axis=0).fillna(0.0)
# Monthly rebalance
rebal_mask = pd.Series(False, index=prices.index)
rebal_indices = list(range(warmup, len(prices), rebal_freq))
rebal_mask.iloc[rebal_indices] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
return weights.shift(1).fillna(0.0)
def combo_signal(funcs_and_weights: list[tuple]) -> callable:
"""Create a combined signal function from [(func, weight), ...]."""
def _combo(p: pd.DataFrame) -> pd.DataFrame:
ranked = []
for func, w in funcs_and_weights:
sig = func(p)
ranked.append(w * sig.rank(axis=1, pct=True, na_option="keep"))
return sum(ranked)
return _combo
def run_backtest(
weights: pd.DataFrame,
prices: pd.DataFrame,
cost: float = 0.001,
) -> pd.Series:
"""Vectorized backtest returning equity curve."""
returns = prices.pct_change().fillna(0.0)
port_ret = (weights * returns).sum(axis=1)
turnover = weights.diff().abs().sum(axis=1)
port_ret -= turnover * cost
return (1 + port_ret).cumprod() * 100000
def compute_stats(equity: pd.Series, label: str) -> dict:
"""Compute strategy statistics."""
daily_ret = equity.pct_change().dropna()
if len(daily_ret) < 100 or daily_ret.std() == 0:
return {"name": label, "cagr": np.nan, "sharpe": np.nan, "maxdd": np.nan,
"total": np.nan, "win_rate": np.nan}
n_years = len(daily_ret) / 252
total_ret = equity.iloc[-1] / equity.iloc[0] - 1
cagr = (1 + total_ret) ** (1 / n_years) - 1
sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252)
sortino_denom = daily_ret[daily_ret < 0].std()
sortino = daily_ret.mean() / sortino_denom * np.sqrt(252) if sortino_denom > 0 else 0
running_max = equity.cummax()
maxdd = ((equity - running_max) / running_max).min()
calmar = cagr / abs(maxdd) if maxdd != 0 else 0
win_rate = (daily_ret > 0).mean()
return {
"name": label, "cagr": cagr, "sharpe": sharpe, "sortino": sortino,
"maxdd": maxdd, "calmar": calmar, "total": total_ret, "win_rate": win_rate,
}
def yearly_returns(equity: pd.Series) -> dict[int, float]:
daily_ret = equity.pct_change().fillna(0)
years = daily_ret.index.year
result = {}
for year in sorted(years.unique()):
mask = years == year
result[year] = float((1 + daily_ret[mask]).prod() - 1)
return result
def run(market: str):
config = UNIVERSES[market]
benchmark = config["benchmark"]
print(f"Loading {market.upper()} price data...")
prices = data_manager.load(market)
bench = prices[benchmark].dropna() if benchmark in prices.columns else None
stocks = prices.drop(columns=[benchmark], errors="ignore")
print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} days")
print(f"Period: {stocks.index[0].date()} to {stocks.index[-1].date()}\n")
# --- Define all strategies to test ---
strategies: list[tuple[str, callable]] = []
# Baseline
strategies.append(("BASELINE: recovery+mom", f_recovery_mom))
# Single factors
strategies.append(("momentum_12_1", f_momentum_12_1))
strategies.append(("recovery", f_recovery))
strategies.append(("vol_adj_momentum", f_vol_adjusted_mom))
strategies.append(("acceleration", f_acceleration))
strategies.append(("breakout_20d", f_breakout))
strategies.append(("recovery_deep_126d", f_recovery_deep))
strategies.append(("recovery_rate", f_recovery_rate))
strategies.append(("drawdown_bounce", f_drawdown_bounce))
strategies.append(("consistent_winner", f_consistent_winner))
strategies.append(("gap_up_freq", f_gap_up_freq))
strategies.append(("low_vol_momentum", f_low_vol_mom))
strategies.append(("52w_channel_position", f_52w_channel_position))
strategies.append(("up_volume_proxy", f_up_volume_proxy))
strategies.append(("relative_strength_ma", f_relative_strength_ma))
strategies.append(("earnings_drift_proxy", f_earnings_drift_proxy))
if market == "cn":
strategies.append(("reversal_vol_cn", f_reversal_vol_cn))
strategies.append(("momentum_6_1", f_momentum_6_1))
strategies.append(("recovery_narrow_21d", f_recovery_narrow))
strategies.append(("range_breakout_cn", f_range_breakout_cn))
# Run all single-factor backtests
print("=" * 110)
print(f" SINGLE FACTOR BACKTESTS — {market.upper()} (Top 10, monthly rebal, 10bps cost)")
print("=" * 110)
results = []
equities = {}
for name, func in strategies:
print(f" Running: {name}...")
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
equities[name] = eq
results.append(compute_stats(eq, name))
# Benchmark
if bench is not None:
eq_bench = bench / bench.iloc[0] * 100000
equities["BENCHMARK"] = eq_bench
results.append(compute_stats(eq_bench, "BENCHMARK"))
# Print results table
df = pd.DataFrame(results).set_index("name")
df = df.sort_values("cagr", ascending=False)
print(f"\n{'Strategy':<30} {'CAGR':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD':>8} {'Calmar':>8} {'Total':>10}")
print("-" * 90)
for name, row in df.iterrows():
flag = " ***" if name == "BASELINE: recovery+mom" else ""
print(f"{name:<30} {row['cagr']:>+7.1%} {row['sharpe']:>8.2f} {row['sortino']:>8.2f} "
f"{row['maxdd']:>+7.1%} {row['calmar']:>8.2f} {row['total']:>+9.0%}{flag}")
# --- Identify factors that beat or match baseline ---
baseline_cagr = df.loc["BASELINE: recovery+mom", "cagr"]
winners = df[df["cagr"] >= baseline_cagr * 0.8].index.tolist()
winners = [w for w in winners if w not in ("BASELINE: recovery+mom", "BENCHMARK")]
print(f"\nFactors within 80% of baseline CAGR ({baseline_cagr:.1%}): {winners}")
# --- Test combinations of top performers ---
print(f"\n{'='*110}")
print(f" FACTOR COMBINATIONS — {market.upper()}")
print(f"{'='*110}")
# Get top single factors
single_only = df.drop(["BASELINE: recovery+mom", "BENCHMARK"], errors="ignore")
top_singles = single_only.nlargest(8, "cagr").index.tolist()
print(f" Top 8 singles: {top_singles}\n")
# Map names back to functions
func_map = dict(strategies)
combos: list[tuple[str, callable]] = []
# Baseline is always included
combos.append(("BASELINE: recovery+mom", f_recovery_mom))
# Top2 combinations
for i in range(min(6, len(top_singles))):
for j in range(i + 1, min(6, len(top_singles))):
n1, n2 = top_singles[i], top_singles[j]
label = f"{n1} + {n2}"
func = combo_signal([(func_map[n1], 0.5), (func_map[n2], 0.5)])
combos.append((label, func))
# Recovery+mom + each top single (3-factor)
for name in top_singles[:6]:
if name in ("momentum_12_1", "recovery"):
continue
label = f"rec+mom + {name}"
func = combo_signal([
(f_recovery, 0.33), (f_momentum_12_1, 0.33), (func_map[name], 0.34)
])
combos.append((label, func))
# Run combo backtests
combo_results = []
for name, func in combos:
print(f" Running: {name}...")
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
equities[name] = eq
combo_results.append(compute_stats(eq, name))
combo_df = pd.DataFrame(combo_results).set_index("name")
combo_df = combo_df.sort_values("cagr", ascending=False)
print(f"\n{'Combo':<55} {'CAGR':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD':>8} {'Calmar':>8}")
print("-" * 105)
for name, row in combo_df.iterrows():
flag = " ***" if name == "BASELINE: recovery+mom" else ""
print(f"{name:<55} {row['cagr']:>+7.1%} {row['sharpe']:>8.2f} {row['sortino']:>8.2f} "
f"{row['maxdd']:>+7.1%} {row['calmar']:>8.2f}{flag}")
# --- Yearly breakdown for top 3 combos ---
top3 = combo_df.nlargest(3, "cagr").index.tolist()
if "BASELINE: recovery+mom" not in top3:
top3.append("BASELINE: recovery+mom")
print(f"\n{'='*110}")
print(f" YEARLY RETURNS — TOP STRATEGIES vs BASELINE — {market.upper()}")
print(f"{'='*110}")
yr_data = {}
for name in top3:
yr_data[name] = yearly_returns(equities[name])
if bench is not None:
yr_data["BENCHMARK"] = yearly_returns(equities["BENCHMARK"])
all_years = sorted(set(y for yd in yr_data.values() for y in yd.keys()))
# Print header
col_names = top3 + (["BENCHMARK"] if bench is not None else [])
header = f" {'Year':<6}"
for c in col_names:
header += f" | {c[:25]:>25}"
print(header)
print(" " + "-" * (6 + 28 * len(col_names)))
for year in all_years:
line = f" {year:<6}"
for c in col_names:
r = yr_data.get(c, {}).get(year, 0)
line += f" | {r:>+24.1%}"
print(line)
# Compute period summaries
for n_years in [3, 5, 10]:
cutoff = stocks.index[-1] - pd.DateOffset(years=n_years)
print(f"\n --- {n_years}-year CAGR ---")
for name in col_names:
eq = equities.get(name)
if eq is None:
continue
eq_slice = eq[eq.index >= cutoff]
if len(eq_slice) < 50:
continue
total = eq_slice.iloc[-1] / eq_slice.iloc[0] - 1
cagr = (1 + total) ** (1 / n_years) - 1
print(f" {name[:40]:<40} {cagr:>+8.1%}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--market", default="us", choices=["us", "cn"])
args = parser.parse_args()
run(args.market)
if __name__ == "__main__":
main()

547
factor_research.py Normal file
View File

@@ -0,0 +1,547 @@
"""
Factor Research Script — Professional QR-style factor mining.
Tests candidate alpha factors using:
- Information Coefficient (IC): rank correlation of signal vs forward returns
- IC Information Ratio (ICIR): mean(IC) / std(IC), measures signal consistency
- Quintile long-short spread: monotonicity of returns across signal buckets
- Turnover: daily rank change, proxy for trading cost
- Decay profile: IC at 1d, 5d, 10d, 20d horizons
- Sub-period stability: IC consistency across rolling windows
- Factor correlation matrix: ensures new factors are orthogonal to known ones
Usage:
uv run python factor_research.py --market us
uv run python factor_research.py --market cn
"""
from __future__ import annotations
import argparse
import warnings
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
warnings.filterwarnings("ignore", category=FutureWarning)
HORIZONS = [1, 5, 10, 20]
# ---------------------------------------------------------------------------
# Factor definitions — each returns a DataFrame (dates x stocks) of scores
# ---------------------------------------------------------------------------
def _safe_rank(df: pd.DataFrame) -> pd.DataFrame:
return df.rank(axis=1, pct=True, na_option="keep")
def _rolling_ret(prices: pd.DataFrame, window: int) -> pd.DataFrame:
return prices.pct_change(window)
# --- Known factors (baselines) ---
def factor_momentum_12_1(prices: pd.DataFrame) -> pd.DataFrame:
"""Classic 12-1 month momentum."""
return prices.shift(21).pct_change(231)
def factor_recovery(prices: pd.DataFrame) -> pd.DataFrame:
"""Price / 63-day low - 1."""
return prices / prices.rolling(63, min_periods=63).min() - 1
def factor_inverse_vol(prices: pd.DataFrame) -> pd.DataFrame:
"""Negative 60-day realized volatility (low vol = high score)."""
return -prices.pct_change().rolling(60, min_periods=60).std()
# --- NEW candidate factors ---
def factor_short_term_reversal(prices: pd.DataFrame) -> pd.DataFrame:
"""5-day return reversal. Hypothesis: short-term mean reversion."""
return -prices.pct_change(5)
def factor_idio_vol_change(prices: pd.DataFrame) -> pd.DataFrame:
"""Change in idiosyncratic volatility (20d vs 60d).
Hypothesis: declining vol = stabilizing, predicts positive returns."""
ret = prices.pct_change()
vol_20 = ret.rolling(20, min_periods=20).std()
vol_60 = ret.rolling(60, min_periods=60).std()
return -(vol_20 / vol_60.replace(0, np.nan) - 1) # negative = vol declining
def factor_volume_price_divergence(prices: pd.DataFrame, volume: pd.DataFrame | None = None) -> pd.DataFrame:
"""Price up but momentum fading — proxy via acceleration.
Without volume data, use return acceleration as proxy."""
ret_5 = prices.pct_change(5)
ret_20 = prices.pct_change(20)
return ret_5 - ret_20 / 4 # recent returns outpacing trend
def factor_max_drawdown_recovery(prices: pd.DataFrame) -> pd.DataFrame:
"""How much of the 60-day max drawdown has been recovered.
Hypothesis: stocks that recover from drawdowns continue recovering."""
rolling_max = prices.rolling(60, min_periods=60).max()
drawdown = prices / rolling_max - 1 # negative
rolling_min_dd = drawdown.rolling(60, min_periods=20).min() # worst drawdown
recovery_pct = drawdown / rolling_min_dd.replace(0, np.nan)
return recovery_pct # closer to 0 = more recovered
def factor_skewness(prices: pd.DataFrame) -> pd.DataFrame:
"""Negative 20-day return skewness.
Hypothesis: negatively skewed stocks are overpriced (lottery preference)."""
ret = prices.pct_change()
return -ret.rolling(20, min_periods=20).skew()
def factor_high_low_range(prices: pd.DataFrame) -> pd.DataFrame:
"""20-day high-low range relative to price.
Hypothesis: narrow range = consolidation, breakout ahead."""
high_20 = prices.rolling(20, min_periods=20).max()
low_20 = prices.rolling(20, min_periods=20).min()
mid = (high_20 + low_20) / 2
return -(high_20 - low_20) / mid.replace(0, np.nan) # negative range = narrow = high score
def factor_mean_reversion_residual(prices: pd.DataFrame) -> pd.DataFrame:
"""Distance from 20-day MA as fraction of 60-day vol.
Hypothesis: stocks far below MA revert. Z-score style."""
ma_20 = prices.rolling(20, min_periods=20).mean()
vol_60 = prices.pct_change().rolling(60, min_periods=60).std() * prices
return -(prices - ma_20) / vol_60.replace(0, np.nan) # below MA = high score
def factor_up_down_vol_ratio(prices: pd.DataFrame) -> pd.DataFrame:
"""Ratio of upside to downside semi-deviation (20d).
Hypothesis: stocks with more upside vol have positive momentum."""
ret = prices.pct_change()
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).std()
down_vol = ret.where(ret < 0, 0).rolling(20, min_periods=15).std()
return up_vol / down_vol.replace(0, np.nan)
def factor_consecutive_up_days(prices: pd.DataFrame) -> pd.DataFrame:
"""Fraction of positive return days in last 10 days.
Hypothesis: persistent winners keep winning (short-term)."""
ret = prices.pct_change()
return (ret > 0).astype(float).rolling(10, min_periods=10).mean()
def factor_gap_momentum(prices: pd.DataFrame) -> pd.DataFrame:
"""Cumulative overnight-like gaps: close-to-close vs intraday proxy.
Using 1-day returns smoothed over 20 days minus 5-day return.
Hypothesis: smooth consistent returns beat volatile ones."""
ret_1d = prices.pct_change()
smoothness = ret_1d.rolling(20, min_periods=20).mean() * 20
raw_20d = prices.pct_change(20)
return smoothness - raw_20d # positive = smoother path
def factor_recovery_acceleration(prices: pd.DataFrame) -> pd.DataFrame:
"""Rate of change of recovery factor.
Hypothesis: accelerating recovery is stronger signal than level."""
recovery = prices / prices.rolling(63, min_periods=63).min() - 1
return recovery.pct_change(5)
def factor_trend_strength(prices: pd.DataFrame) -> pd.DataFrame:
"""R-squared of log-price vs time over 60 days.
Hypothesis: stocks trending linearly (high R2) continue."""
log_p = np.log(prices.replace(0, np.nan))
def _r2(series):
y = series.dropna().values
if len(y) < 30:
return np.nan
x = np.arange(len(y), dtype=float)
x -= x.mean()
y_dm = y - y.mean()
ss_xy = (x * y_dm).sum()
ss_xx = (x * x).sum()
ss_yy = (y_dm * y_dm).sum()
if ss_xx == 0 or ss_yy == 0:
return np.nan
r2 = (ss_xy ** 2) / (ss_xx * ss_yy)
slope = ss_xy / ss_xx
return r2 if slope > 0 else -r2 # sign by direction
return log_p.rolling(60, min_periods=30).apply(_r2, raw=False)
def factor_relative_volume_momentum(prices: pd.DataFrame) -> pd.DataFrame:
"""Price momentum weighted by how 'cheap' a stock is relative to 52-week range.
Hypothesis: momentum in stocks near lows is more likely to persist."""
mom_20 = prices.pct_change(20)
high_252 = prices.rolling(252, min_periods=126).max()
low_252 = prices.rolling(252, min_periods=126).min()
position_in_range = (prices - low_252) / (high_252 - low_252).replace(0, np.nan)
return mom_20 * (1 - position_in_range) # momentum * cheapness
def factor_52w_high_distance(prices: pd.DataFrame) -> pd.DataFrame:
"""Distance from 52-week high.
Hypothesis: stocks near their highs continue (anchoring bias)."""
high_252 = prices.rolling(252, min_periods=126).max()
return prices / high_252 # closer to 1 = near high
def factor_downside_beta_proxy(prices: pd.DataFrame) -> pd.DataFrame:
"""Proxy for downside beta using co-movement on market down days.
Hypothesis: low downside beta outperforms (asymmetric risk)."""
ret = prices.pct_change()
market_ret = ret.mean(axis=1)
down_days = market_ret < 0
# Mask non-down-day returns to NaN, then rolling mean
# Use numpy for correct broadcasting, wider window (120d) so ~54 down
# days are available, well above min_periods=20
arr = ret.values.copy()
arr[~down_days.values, :] = np.nan
down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns)
avg_down = down_ret.rolling(120, min_periods=20).mean()
return -avg_down # negative = less downside = good
# --- A-share specific factors ---
def factor_liquidity_premium(prices: pd.DataFrame) -> pd.DataFrame:
"""Amihud illiquidity proxy (using returns only, no volume).
Hypothesis: illiquid stocks earn premium in A-shares (retail driven)."""
ret = prices.pct_change()
# Use absolute return as illiquidity proxy (higher abs ret = less liquid)
illiq = ret.abs().rolling(20, min_periods=15).mean()
return illiq
def factor_lottery_demand(prices: pd.DataFrame) -> pd.DataFrame:
"""Max daily return in past 20 days (negative).
Hypothesis: lottery stocks (high max return) underperform.
Strong in A-shares due to retail speculation."""
ret = prices.pct_change()
return -ret.rolling(20, min_periods=15).max()
def factor_turnover_reversal(prices: pd.DataFrame) -> pd.DataFrame:
"""Interaction of short-term returns with volatility.
High vol + negative return = oversold bounce candidate.
Common A-share alpha source."""
ret_5 = prices.pct_change(5)
vol_20 = prices.pct_change().rolling(20, min_periods=15).std()
return -ret_5 * vol_20 # oversold + high vol = positive
def factor_price_level(prices: pd.DataFrame) -> pd.DataFrame:
"""Negative absolute price level.
Hypothesis: low-priced stocks attract retail in A-shares (penny stock effect)."""
return -prices
# ---------------------------------------------------------------------------
# IC and analytics engine
# ---------------------------------------------------------------------------
def compute_ic(
signal: pd.DataFrame,
forward_ret: pd.DataFrame,
) -> pd.Series:
"""Cross-sectional rank IC (Spearman) per day."""
common_idx = signal.index.intersection(forward_ret.index)
common_cols = signal.columns.intersection(forward_ret.columns)
sig = signal.loc[common_idx, common_cols]
fwd = forward_ret.loc[common_idx, common_cols]
ics = []
for date in common_idx:
s = sig.loc[date].dropna()
f = fwd.loc[date].dropna()
common = s.index.intersection(f.index)
if len(common) < 30:
continue
ic = s[common].corr(f[common], method="spearman")
if np.isfinite(ic):
ics.append((date, ic))
if not ics:
return pd.Series(dtype=float)
return pd.Series(dict(ics))
def compute_quintile_returns(
signal: pd.DataFrame,
forward_ret: pd.DataFrame,
n_quantiles: int = 5,
) -> pd.DataFrame:
"""Average forward return by signal quintile, per day, then time-averaged."""
common_idx = signal.index.intersection(forward_ret.index)
common_cols = signal.columns.intersection(forward_ret.columns)
sig = signal.loc[common_idx, common_cols]
fwd = forward_ret.loc[common_idx, common_cols]
records = []
for date in common_idx:
s = sig.loc[date].dropna()
f = fwd.loc[date].dropna()
common = s.index.intersection(f.index)
if len(common) < 50:
continue
scores = s[common]
rets = f[common]
try:
quintile = pd.qcut(scores, n_quantiles, labels=False, duplicates="drop")
except ValueError:
continue
for q in range(n_quantiles):
mask = quintile == q
if mask.sum() > 0:
records.append({"date": date, "quintile": q + 1, "return": rets[mask].mean()})
if not records:
return pd.DataFrame()
df = pd.DataFrame(records)
return df.groupby("quintile")["return"].mean() * 252 # annualize
def compute_turnover(signal: pd.DataFrame) -> float:
"""Average daily rank change (turnover proxy)."""
ranked = signal.rank(axis=1, pct=True, na_option="keep")
daily_change = ranked.diff().abs().mean(axis=1)
return float(daily_change.mean())
def compute_factor_correlation(factors: dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Cross-sectional IC correlation between all factor pairs."""
names = list(factors.keys())
n = len(names)
corr_matrix = pd.DataFrame(np.nan, index=names, columns=names)
# Use time-series of rank-averaged signals
avg_ranks = {}
for name, sig in factors.items():
ranked = sig.rank(axis=1, pct=True, na_option="keep")
avg_ranks[name] = ranked.mean(axis=1).dropna()
for i in range(n):
for j in range(i, n):
s1 = avg_ranks[names[i]]
s2 = avg_ranks[names[j]]
common = s1.index.intersection(s2.index)
if len(common) > 100:
c = s1[common].corr(s2[common])
corr_matrix.loc[names[i], names[j]] = c
corr_matrix.loc[names[j], names[i]] = c
if i == j:
corr_matrix.loc[names[i], names[j]] = 1.0
return corr_matrix
def analyze_factor(
name: str,
signal: pd.DataFrame,
prices: pd.DataFrame,
horizons: list[int] | None = None,
) -> dict:
"""Full single-factor analysis."""
if horizons is None:
horizons = HORIZONS
results = {"name": name}
# Forward returns at each horizon
for h in horizons:
fwd_ret = prices.pct_change(h).shift(-h)
ic_series = compute_ic(signal, fwd_ret)
if len(ic_series) == 0:
results[f"ic_{h}d"] = np.nan
results[f"icir_{h}d"] = np.nan
continue
ic_mean = ic_series.mean()
ic_std = ic_series.std()
icir = ic_mean / ic_std if ic_std > 0 else 0.0
results[f"ic_{h}d"] = ic_mean
results[f"icir_{h}d"] = icir
if h == 1:
results["ic_1d_series"] = ic_series
# Quintile analysis at 5-day horizon
fwd_5d = prices.pct_change(5).shift(-5)
quintiles = compute_quintile_returns(signal, fwd_5d)
if not quintiles.empty:
results["q5_return"] = float(quintiles.iloc[-1]) # top quintile
results["q1_return"] = float(quintiles.iloc[0]) # bottom quintile
results["long_short_ann"] = float(quintiles.iloc[-1] - quintiles.iloc[0])
results["monotonicity"] = float(quintiles.corr(pd.Series(range(1, len(quintiles) + 1), index=quintiles.index)))
results["quintile_returns"] = quintiles
else:
results["q5_return"] = np.nan
results["q1_return"] = np.nan
results["long_short_ann"] = np.nan
results["monotonicity"] = np.nan
# Turnover
results["turnover"] = compute_turnover(signal)
# Sub-period IC stability (rolling 252-day IC mean)
if "ic_1d_series" in results and len(results["ic_1d_series"]) > 252:
rolling_ic = results["ic_1d_series"].rolling(252).mean().dropna()
results["ic_stability"] = float((rolling_ic > 0).mean()) # fraction of time IC > 0
results["ic_worst_year"] = float(rolling_ic.min())
results["ic_best_year"] = float(rolling_ic.max())
else:
results["ic_stability"] = np.nan
results["ic_worst_year"] = np.nan
results["ic_best_year"] = np.nan
return results
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def get_all_factors(prices: pd.DataFrame, market: str) -> dict[str, pd.DataFrame]:
"""Build all candidate factor signals."""
factors = {}
# Known baselines
factors["momentum_12_1"] = factor_momentum_12_1(prices)
factors["recovery"] = factor_recovery(prices)
factors["inverse_vol"] = factor_inverse_vol(prices)
# New candidates — universal
factors["short_term_reversal"] = factor_short_term_reversal(prices)
factors["idio_vol_change"] = factor_idio_vol_change(prices)
factors["return_acceleration"] = factor_volume_price_divergence(prices)
factors["drawdown_recovery"] = factor_max_drawdown_recovery(prices)
factors["neg_skewness"] = factor_skewness(prices)
factors["range_compression"] = factor_high_low_range(prices)
factors["mean_rev_zscore"] = factor_mean_reversion_residual(prices)
factors["up_down_vol_ratio"] = factor_up_down_vol_ratio(prices)
factors["win_streak"] = factor_consecutive_up_days(prices)
factors["smooth_momentum"] = factor_gap_momentum(prices)
factors["recovery_accel"] = factor_recovery_acceleration(prices)
factors["trend_r2"] = factor_trend_strength(prices)
factors["cheap_momentum"] = factor_relative_volume_momentum(prices)
factors["near_52w_high"] = factor_52w_high_distance(prices)
factors["low_downside_beta"] = factor_downside_beta_proxy(prices)
# A-share specific (also test on US for comparison)
if market == "cn":
factors["illiquidity"] = factor_liquidity_premium(prices)
factors["anti_lottery"] = factor_lottery_demand(prices)
factors["vol_reversal"] = factor_turnover_reversal(prices)
factors["low_price"] = factor_price_level(prices)
return factors
def print_summary_table(results: list[dict], market: str) -> None:
"""Print a ranked summary of all factors."""
rows = []
for r in results:
rows.append({
"Factor": r["name"],
"IC_1d": r.get("ic_1d", np.nan),
"ICIR_1d": r.get("icir_1d", np.nan),
"IC_5d": r.get("ic_5d", np.nan),
"ICIR_5d": r.get("icir_5d", np.nan),
"IC_20d": r.get("ic_20d", np.nan),
"ICIR_20d": r.get("icir_20d", np.nan),
"LS_5d_ann": r.get("long_short_ann", np.nan),
"Mono": r.get("monotonicity", np.nan),
"Turnover": r.get("turnover", np.nan),
"IC_stab": r.get("ic_stability", np.nan),
"IC_worst_yr": r.get("ic_worst_year", np.nan),
})
df = pd.DataFrame(rows).set_index("Factor")
df = df.sort_values("ICIR_5d", ascending=False)
print(f"\n{'='*100}")
print(f" FACTOR RESEARCH RESULTS — {market.upper()} MARKET")
print(f"{'='*100}")
print("\nRanked by 5-day ICIR (most important metric for tradeable alpha):\n")
print(df.round(4).to_string())
# Highlight top factors
print(f"\n{'='*100}")
print(" TOP FACTORS (ICIR_5d > 0.05 and IC_stability > 0.6)")
print(f"{'='*100}")
top = df[(df["ICIR_5d"].abs() > 0.05) & (df["IC_stab"] > 0.6)]
if top.empty:
top = df.head(5)
print(" (No factor met strict threshold; showing top 5 by ICIR_5d)")
print(top.round(4).to_string())
# Quintile details for top factors
print(f"\n{'='*100}")
print(" QUINTILE RETURN PROFILES (annualized, 5-day forward)")
print(f"{'='*100}")
for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:8]:
qr = r.get("quintile_returns")
if qr is not None and not qr.empty:
q_str = " ".join(f"Q{int(k)}: {v:+.1%}" for k, v in qr.items())
ls = r.get("long_short_ann", 0)
print(f" {r['name']:25s} | {q_str} | L/S: {ls:+.1%}")
def main():
parser = argparse.ArgumentParser(description="Factor research")
parser.add_argument("--market", default="us", choices=["us", "cn"])
parser.add_argument("--years", type=int, default=None, help="Limit to last N years")
args = parser.parse_args()
market = args.market
config = UNIVERSES[market]
benchmark = config["benchmark"]
print(f"Loading {market.upper()} price data...")
prices = data_manager.load(market)
# Remove benchmark from stock universe
stocks = prices.drop(columns=[benchmark], errors="ignore")
if args.years:
cutoff = stocks.index[-1] - pd.DateOffset(years=args.years)
stocks = stocks[stocks.index >= cutoff]
print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} trading days")
print(f"Date range: {stocks.index[0].date()} to {stocks.index[-1].date()}")
# Build all factor signals
print("\nComputing factor signals...")
factors = get_all_factors(stocks, market)
# Analyze each factor
print("Running factor analysis (this may take a few minutes)...")
results = []
for name, signal in factors.items():
print(f" Analyzing: {name}...")
r = analyze_factor(name, signal, stocks)
results.append(r)
# Print results
print_summary_table(results, market)
# Factor correlation matrix
print(f"\n{'='*100}")
print(" FACTOR CORRELATION MATRIX (rank-averaged cross-sectional)")
print(f"{'='*100}")
corr = compute_factor_correlation(factors)
# Show only top factors
top_names = [r["name"] for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:10]]
top_names_present = [n for n in top_names if n in corr.index]
print(corr.loc[top_names_present, top_names_present].round(2).to_string())
if __name__ == "__main__":
main()

323
factor_robustness.py Normal file
View File

@@ -0,0 +1,323 @@
"""
Robustness checks for winning factor strategies.
Tests:
1. Rolling 2-year window performance (stability)
2. Top-N sensitivity (5, 10, 15, 20)
3. Rebalance frequency sensitivity (5d, 10d, 21d, 42d)
4. Transaction cost sensitivity (0, 10bps, 20bps, 50bps)
5. Drawdown analysis
"""
from __future__ import annotations
import argparse
import warnings
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
from factor_real_backtest import (
f_recovery_mom,
f_momentum_12_1,
f_recovery,
f_recovery_deep,
f_up_volume_proxy,
f_gap_up_freq,
f_earnings_drift_proxy,
f_reversal_vol_cn,
f_consistent_winner,
combo_signal,
make_strategy,
run_backtest,
compute_stats,
)
warnings.filterwarnings("ignore")
def rolling_window_performance(equity: pd.Series, window_years: int = 2):
"""Compute rolling window returns."""
daily_ret = equity.pct_change().dropna()
window = 252 * window_years
results = []
for end_idx in range(window, len(daily_ret), 63): # step 3 months
start_idx = end_idx - window
chunk = daily_ret.iloc[start_idx:end_idx]
total = (1 + chunk).prod() - 1
ann = (1 + total) ** (252 / len(chunk)) - 1
sharpe = chunk.mean() / chunk.std() * np.sqrt(252) if chunk.std() > 0 else 0
results.append({
"end_date": chunk.index[-1].date(),
"ann_return": ann,
"sharpe": sharpe,
})
return pd.DataFrame(results)
def drawdown_analysis(equity: pd.Series) -> pd.DataFrame:
"""Find top 5 drawdown episodes."""
running_max = equity.cummax()
drawdown = (equity - running_max) / running_max
# Find drawdown episodes
episodes = []
in_dd = False
start = None
for i in range(len(drawdown)):
if drawdown.iloc[i] < -0.05 and not in_dd:
in_dd = True
start = i
elif drawdown.iloc[i] >= 0 and in_dd:
in_dd = False
trough_idx = drawdown.iloc[start:i].idxmin()
episodes.append({
"start": drawdown.index[start].date(),
"trough": trough_idx.date(),
"end": drawdown.index[i].date(),
"depth": drawdown.loc[trough_idx],
"duration_days": i - start,
})
# Handle ongoing drawdown
if in_dd:
trough_idx = drawdown.iloc[start:].idxmin()
episodes.append({
"start": drawdown.index[start].date(),
"trough": trough_idx.date(),
"end": "ongoing",
"depth": drawdown.loc[trough_idx],
"duration_days": len(drawdown) - start,
})
df = pd.DataFrame(episodes)
if df.empty:
return df
return df.nsmallest(5, "depth")
def run_us(stocks: pd.DataFrame):
print("=" * 100)
print(" US ROBUSTNESS — Winner: momentum_12_1 + up_volume_proxy")
print("=" * 100)
winner_func = combo_signal([(f_momentum_12_1, 0.5), (f_up_volume_proxy, 0.5)])
baseline_func = f_recovery_mom
# 1. Rolling 2-year performance
print("\n--- 1. Rolling 2-Year Performance ---\n")
for label, func in [("Winner: mom+upvol", winner_func),
("Baseline: rec+mom", baseline_func)]:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
roll = rolling_window_performance(eq)
if roll.empty:
continue
win_pct = (roll["ann_return"] > 0).mean()
print(f" {label}:")
print(f" Mean 2yr ann return: {roll['ann_return'].mean():+.1%}")
print(f" Min 2yr ann return: {roll['ann_return'].min():+.1%}")
print(f" Max 2yr ann return: {roll['ann_return'].max():+.1%}")
print(f" % positive 2yr: {win_pct:.0%}")
print(f" Mean 2yr Sharpe: {roll['sharpe'].mean():.2f}")
print()
# 2. Top-N sensitivity
print("--- 2. Top-N Sensitivity ---\n")
header = f" {'Top-N':<8}"
for label in ["Winner: mom+upvol", "Baseline: rec+mom"]:
header += f" | {'CAGR':>8} {'Sharpe':>8} {'MaxDD':>8}"
print(header)
print(" " + "-" * 70)
for top_n in [5, 10, 15, 20, 30]:
line = f" {top_n:<8}"
for func in [winner_func, baseline_func]:
w = make_strategy(stocks, func, top_n=top_n)
eq = run_backtest(w, stocks)
s = compute_stats(eq, "")
line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f} {s['maxdd']:>+7.1%}"
print(line)
# 3. Rebalance frequency sensitivity
print("\n--- 3. Rebalance Frequency Sensitivity ---\n")
header = f" {'Rebal':<8}"
for label in ["Winner: mom+upvol", "Baseline: rec+mom"]:
header += f" | {'CAGR':>8} {'Sharpe':>8} {'MaxDD':>8}"
print(header)
print(" " + "-" * 70)
for rebal in [5, 10, 21, 42, 63]:
line = f" {rebal}d{'':<5}"
for func in [winner_func, baseline_func]:
w = make_strategy(stocks, func, top_n=10, rebal_freq=rebal)
eq = run_backtest(w, stocks)
s = compute_stats(eq, "")
line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f} {s['maxdd']:>+7.1%}"
print(line)
# 4. Transaction cost sensitivity
print("\n--- 4. Transaction Cost Sensitivity ---\n")
header = f" {'Cost':<8}"
for label in ["Winner: mom+upvol", "Baseline: rec+mom"]:
header += f" | {'CAGR':>8} {'Sharpe':>8}"
print(header)
print(" " + "-" * 50)
for cost in [0, 0.001, 0.002, 0.005]:
line = f" {cost*10000:.0f}bps{'':<4}"
for func in [winner_func, baseline_func]:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks, cost=cost)
s = compute_stats(eq, "")
line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f}"
print(line)
# 5. Drawdown analysis
print("\n--- 5. Drawdown Episodes ---\n")
for label, func in [("Winner: mom+upvol", winner_func),
("Baseline: rec+mom", baseline_func)]:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
dd = drawdown_analysis(eq)
print(f" {label}:")
if dd.empty:
print(" No significant drawdowns")
else:
for _, row in dd.iterrows():
print(f" {row['start']}{row['trough']}{row['end']}: "
f"{row['depth']:+.1%} ({row['duration_days']}d)")
print()
# 6. Also test the runner-up combos
print("--- 6. Other Strong Combos (Top-10, 21d rebal, 10bps) ---\n")
other_combos = [
("rec_deep+upvol", combo_signal([(f_recovery_deep, 0.5), (f_up_volume_proxy, 0.5)])),
("rec_deep+mom", combo_signal([(f_recovery_deep, 0.5), (f_momentum_12_1, 0.5)])),
("mom+gap_up", combo_signal([(f_momentum_12_1, 0.5), (f_gap_up_freq, 0.5)])),
("rec_deep+upvol+mom", combo_signal([(f_recovery_deep, 0.33), (f_up_volume_proxy, 0.33), (f_momentum_12_1, 0.34)])),
("mom+upvol+gap", combo_signal([(f_momentum_12_1, 0.33), (f_up_volume_proxy, 0.33), (f_gap_up_freq, 0.34)])),
]
for label, func in other_combos:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
s = compute_stats(eq, "")
print(f" {label:<25} CAGR: {s['cagr']:>+7.1%} Sharpe: {s['sharpe']:.2f} MaxDD: {s['maxdd']:>+7.1%} Calmar: {s['calmar']:.2f}")
def run_cn(stocks: pd.DataFrame):
print("\n" + "=" * 100)
print(" CN ROBUSTNESS — Winners: reversal_vol + gap_up, earn_drift + reversal_vol")
print("=" * 100)
winner1_func = combo_signal([(f_reversal_vol_cn, 0.5), (f_gap_up_freq, 0.5)])
winner2_func = combo_signal([(f_earnings_drift_proxy, 0.5), (f_reversal_vol_cn, 0.5)])
baseline_func = f_recovery_mom
# 1. Rolling 2-year performance
print("\n--- 1. Rolling 2-Year Performance ---\n")
for label, func in [("W1: rev_vol+gap_up", winner1_func),
("W2: earn_drift+rev_vol", winner2_func),
("Baseline: rec+mom", baseline_func)]:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
roll = rolling_window_performance(eq)
if roll.empty:
continue
win_pct = (roll["ann_return"] > 0).mean()
print(f" {label}:")
print(f" Mean 2yr ann return: {roll['ann_return'].mean():+.1%}")
print(f" Min 2yr ann return: {roll['ann_return'].min():+.1%}")
print(f" Max 2yr ann return: {roll['ann_return'].max():+.1%}")
print(f" % positive 2yr: {win_pct:.0%}")
print(f" Mean 2yr Sharpe: {roll['sharpe'].mean():.2f}")
print()
# 2. Top-N sensitivity
print("--- 2. Top-N Sensitivity ---\n")
header = f" {'Top-N':<8}"
for label in ["W1: rev+gap", "W2: earn+rev", "Baseline"]:
header += f" | {'CAGR':>8} {'Sharpe':>8} {'MaxDD':>8}"
print(header)
print(" " + "-" * 100)
for top_n in [5, 10, 15, 20]:
line = f" {top_n:<8}"
for func in [winner1_func, winner2_func, baseline_func]:
w = make_strategy(stocks, func, top_n=top_n)
eq = run_backtest(w, stocks)
s = compute_stats(eq, "")
line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f} {s['maxdd']:>+7.1%}"
print(line)
# 3. Rebalance frequency
print("\n--- 3. Rebalance Frequency ---\n")
header = f" {'Rebal':<8}"
for label in ["W1: rev+gap", "W2: earn+rev", "Baseline"]:
header += f" | {'CAGR':>8} {'Sharpe':>8}"
print(header)
print(" " + "-" * 75)
for rebal in [5, 10, 21, 42]:
line = f" {rebal}d{'':<5}"
for func in [winner1_func, winner2_func, baseline_func]:
w = make_strategy(stocks, func, top_n=10, rebal_freq=rebal)
eq = run_backtest(w, stocks)
s = compute_stats(eq, "")
line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f}"
print(line)
# 4. Transaction cost sensitivity
print("\n--- 4. Transaction Cost Sensitivity ---\n")
header = f" {'Cost':<8}"
for label in ["W1: rev+gap", "W2: earn+rev", "Baseline"]:
header += f" | {'CAGR':>8} {'Sharpe':>8}"
print(header)
print(" " + "-" * 75)
for cost in [0, 0.001, 0.002, 0.005]:
line = f" {cost*10000:.0f}bps{'':<4}"
for func in [winner1_func, winner2_func, baseline_func]:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks, cost=cost)
s = compute_stats(eq, "")
line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f}"
print(line)
# 5. Drawdown analysis
print("\n--- 5. Drawdown Episodes ---\n")
for label, func in [("W1: rev_vol+gap_up", winner1_func),
("W2: earn_drift+rev_vol", winner2_func),
("Baseline: rec+mom", baseline_func)]:
w = make_strategy(stocks, func, top_n=10)
eq = run_backtest(w, stocks)
dd = drawdown_analysis(eq)
print(f" {label}:")
if dd.empty:
print(" No significant drawdowns")
else:
for _, row in dd.iterrows():
print(f" {row['start']}{row['trough']}{row['end']}: "
f"{row['depth']:+.1%} ({row['duration_days']}d)")
print()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--market", default="both", choices=["us", "cn", "both"])
args = parser.parse_args()
if args.market in ("us", "both"):
prices = data_manager.load("us")
stocks = prices.drop(columns=["SPY"], errors="ignore")
run_us(stocks)
if args.market in ("cn", "both"):
prices = data_manager.load("cn")
stocks = prices.drop(columns=["000300.SS"], errors="ignore")
run_cn(stocks)
if __name__ == "__main__":
main()

259
factor_yearly_fresh.py Normal file
View File

@@ -0,0 +1,259 @@
"""
Rebalancing frequency comparison: daily (1d) vs weekly (5d) vs biweekly (10d) vs monthly (21d).
Shows yearly returns and max drawdown for each frequency, for all champion strategies.
"""
from __future__ import annotations
import warnings
import numpy as np
import pandas as pd
import data_manager
from factor_loop import (
strat, bt, stats, combo,
f_rec_mom, f_rec_126, f_rec_63,
f_mom_12_1, f_mom_6_1, f_mom_intermediate,
f_above_ma200, f_golden_cross,
f_up_volume_proxy, f_gap_up_freq,
f_rec_mom_filtered, f_down_resilience,
f_up_capture, f_52w_high, f_str_10d,
f_earnings_drift, f_reversal_vol,
)
warnings.filterwarnings("ignore")
INITIAL = 10_000
REBAL_CONFIGS = [
("daily", 1),
("weekly", 5),
("biweekly", 10),
("monthly", 21),
]
def f_quality_mom(p):
mom = f_mom_12_1(p)
consist = (p.pct_change() > 0).astype(float).rolling(252, min_periods=126).mean()
mom_r = mom.rank(axis=1, pct=True, na_option="keep")
con_r = consist.rank(axis=1, pct=True, na_option="keep")
up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r
def f_mom_x_gap(p):
return (f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") *
f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep"))
def run_equity(func, prices, rebal=21, cost=0.001):
w = strat(prices, func, top_n=10, rebal=rebal)
eq = bt(w, prices, cost=cost)
return eq / eq.iloc[0] * INITIAL
def year_returns(eq: pd.Series) -> dict[int, float]:
dr = eq.pct_change().fillna(0)
return {y: float((1 + dr[dr.index.year == y]).prod() - 1)
for y in sorted(dr.index.year.unique())}
def max_drawdown(eq: pd.Series) -> float:
rm = eq.cummax()
dd = (eq - rm) / rm
return float(dd.min())
def max_drawdown_yearly(eq: pd.Series) -> dict[int, float]:
result = {}
for y in sorted(eq.index.year.unique()):
chunk = eq[eq.index.year == y]
if len(chunk) < 5:
continue
rm = chunk.cummax()
dd = (chunk - rm) / rm
result[y] = float(dd.min())
return result
def cagr(eq: pd.Series) -> float:
dr = eq.pct_change().dropna()
if len(dr) < 100:
return np.nan
ny = len(dr) / 252
tot = eq.iloc[-1] / eq.iloc[0] - 1
return (1 + tot) ** (1 / ny) - 1
def sharpe(eq: pd.Series) -> float:
dr = eq.pct_change().dropna()
if len(dr) < 100 or dr.std() == 0:
return np.nan
return float(dr.mean() / dr.std() * np.sqrt(252))
def turnover_annual(func, prices, rebal):
"""Estimate annualised turnover (one-way)."""
w = strat(prices, func, top_n=10, rebal=rebal)
daily_turn = w.diff().abs().sum(axis=1).mean()
return daily_turn * 252
def print_by_year(strat_defs, prices, bench_eq, bench_label, market_label, years):
"""For each year, print a table: strategies as rows, rebal frequencies as columns."""
freq_labels = [r for r, _ in REBAL_CONFIGS]
# Pre-compute all equities and returns
all_eqs = {} # {(sname, freq): equity}
for sname, func in strat_defs.items():
for rlabel, rdays in REBAL_CONFIGS:
all_eqs[(sname, rlabel)] = run_equity(func, prices, rebal=rdays)
all_rets = {} # {(sname, freq): {year: ret}}
for key, eq in all_eqs.items():
all_rets[key] = year_returns(eq)
bench_rets = year_returns(bench_eq)
snames = list(strat_defs.keys())
name_w = max(len(s) for s in snames) + 1
for year in years:
line_w = name_w + 4 + 20 * (len(freq_labels) + 1)
print(f"\n{'=' * line_w}")
print(f" {market_label}{year} (fresh $10,000)")
print(f"{'=' * line_w}")
# Header
print(f" {'Strategy':<{name_w}}", end="")
for f in freq_labels:
print(f" {f:>18}", end="")
print(f" {bench_label:>18}")
print(f" {'-'*name_w}", end="")
for _ in range(len(freq_labels) + 1):
print(f" {'-'*18}", end="")
print()
for sname in snames:
print(f" {sname:<{name_w}}", end="")
# Find best freq for this strategy this year
freq_vals = {}
for f in freq_labels:
r = all_rets[(sname, f)].get(year)
if r is not None and abs(r) > 0.0005:
freq_vals[f] = r
best_f = max(freq_vals, key=freq_vals.get) if freq_vals else None
for f in freq_labels:
r = all_rets[(sname, f)].get(year)
if r is not None and abs(r) > 0.0005:
v = INITIAL * (1 + r)
marker = "" if f == best_f else " "
print(f" ${v:>9,.0f} {r:>+5.0%}{marker}", end="")
else:
print(f" {'':>18}", end="")
# Benchmark (same for all strategies)
br = bench_rets.get(year)
if br is not None and abs(br) > 0.0005:
print(f" ${INITIAL*(1+br):>9,.0f} {br:>+5.0%} ", end="")
else:
print(f" {'':>18}", end="")
print()
# Best strategy per freq
print(f" {'-'*name_w}", end="")
for _ in range(len(freq_labels) + 1):
print(f" {'-'*18}", end="")
print()
print(f" {'BEST':<{name_w}}", end="")
for f in freq_labels:
best_r = -999
best_s = ""
for sname in snames:
r = all_rets[(sname, f)].get(year)
if r is not None and abs(r) > 0.0005 and r > best_r:
best_r = r
best_s = sname
if best_r > -999:
print(f" ${INITIAL*(1+best_r):>9,.0f} {best_r:>+5.0%} ", end="")
else:
print(f" {'':>18}", end="")
# bench
br = bench_rets.get(year)
if br is not None and abs(br) > 0.0005:
print(f" ${INITIAL*(1+br):>9,.0f} {br:>+5.0%} ", end="")
else:
print(f" {'':>18}", end="")
print()
def main():
years = list(range(2015, 2027))
# ===== US =====
print(f"\n{'#'*130}")
print(f"{'#'*50} US MARKET {'#'*50}")
print(f"{'#'*130}")
prices_us = data_manager.load("us")
bench_us = prices_us["SPY"].dropna()
stocks_us = prices_us.drop(columns=["SPY"], errors="ignore")
eq_spy = bench_us / bench_us.iloc[0] * INITIAL
us_strats = {
"rec_mfilt+deep×upvol": combo([
(f_rec_mom_filtered, 0.5),
(combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), 0.5),
]),
"ma200+mom7m+rec126": combo([
(f_above_ma200, 0.33), (f_mom_intermediate, 0.33), (f_rec_126, 0.34)
]),
"rec_mfilt+ma200": combo([
(f_rec_mom_filtered, 0.5), (f_above_ma200, 0.5)
]),
"mom7m+rec126": combo([
(f_mom_intermediate, 0.5), (f_rec_126, 0.5)
]),
"BASELINE:rec+mom": f_rec_mom,
}
print_by_year(us_strats, stocks_us, eq_spy, "SPY", "US", years)
# ===== CN =====
print(f"\n\n{'#'*130}")
print(f"{'#'*50} CN MARKET {'#'*50}")
print(f"{'#'*130}")
prices_cn = data_manager.load("cn")
bench_cn = prices_cn["000300.SS"].dropna() if "000300.SS" in prices_cn.columns else None
stocks_cn = prices_cn.drop(columns=["000300.SS"], errors="ignore")
cn_strats = {
"up_cap+quality_mom": combo([
(f_up_capture, 0.5), (f_quality_mom, 0.5)
]),
"down_resil+qual_mom": combo([
(f_down_resilience, 0.5), (f_quality_mom, 0.5)
]),
"rec63+mom×gap": combo([
(f_rec_63, 0.5), (f_mom_x_gap, 0.5)
]),
"up_cap+mom×gap": combo([
(f_up_capture, 0.5), (f_mom_x_gap, 0.5)
]),
"BASELINE:rec+mom": f_rec_mom,
}
if bench_cn is not None:
eq_csi = bench_cn / bench_cn.iloc[0] * INITIAL
else:
eq_csi = pd.Series(dtype=float)
print_by_year(cn_strats, stocks_cn, eq_csi, "CSI300", "CN", years)
if __name__ == "__main__":
main()

219
factor_yearly_report.py Normal file
View File

@@ -0,0 +1,219 @@
"""
Yearly ROI report for champion strategies vs SPY, starting from $10,000.
"""
from __future__ import annotations
import warnings
import numpy as np
import pandas as pd
import data_manager
from universe import UNIVERSES
from factor_loop import (
strat, bt, stats, combo,
f_rec_mom, f_rec_126, f_rec_63,
f_mom_12_1, f_mom_6_1, f_mom_intermediate,
f_above_ma200, f_golden_cross,
f_up_volume_proxy, f_gap_up_freq,
f_rec_mom_filtered, f_down_resilience,
f_up_capture, f_52w_high, f_str_10d,
f_earnings_drift, f_reversal_vol,
)
warnings.filterwarnings("ignore")
INITIAL = 10_000
def f_quality_mom(p):
mom = f_mom_12_1(p)
consist = (p.pct_change() > 0).astype(float).rolling(252, min_periods=126).mean()
mom_r = mom.rank(axis=1, pct=True, na_option="keep")
con_r = consist.rank(axis=1, pct=True, na_option="keep")
up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r
def f_mom_x_gap(p):
return (f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") *
f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep"))
def run_equity(func, prices, cost=0.001):
w = strat(prices, func, top_n=10)
eq = bt(w, prices, cost=cost)
return eq / eq.iloc[0] * INITIAL
def yearly_table(equities: dict[str, pd.Series], title: str):
print(f"\n{'='*130}")
print(f" {title}")
print(f" Starting capital: ${INITIAL:,.0f}")
print(f"{'='*130}")
names = list(equities.keys())
all_years = sorted(set(y for eq in equities.values() for y in eq.index.year.unique()))
# Header
print(f"\n {'Year':<6}", end="")
for n in names:
print(f" | {n[:24]:>24}", end="")
print()
print(f" {'-'*6}", end="")
for _ in names:
print(f"-+-{'-'*24}", end="")
print()
# Track portfolio values
year_end_vals = {n: INITIAL for n in names}
for year in all_years:
print(f" {year:<6}", end="")
for n in names:
eq = equities[n]
yr_data = eq[eq.index.year == year]
if len(yr_data) < 2:
print(f" | {'':>24}", end="")
continue
start_val = yr_data.iloc[0]
end_val = yr_data.iloc[-1]
ret = end_val / start_val - 1
year_end_vals[n] = end_val
# Show both return % and portfolio value
print(f" | {ret:>+7.1%} ${end_val:>12,.0f}", end="")
print()
# Summary rows
print(f" {'-'*6}", end="")
for _ in names:
print(f"-+-{'-'*24}", end="")
print()
# Total return
print(f" {'Total':<6}", end="")
for n in names:
eq = equities[n]
total = eq.iloc[-1] / INITIAL - 1
print(f" | {total:>+7.0%} ${eq.iloc[-1]:>12,.0f}", end="")
print()
# CAGR
print(f" {'CAGR':<6}", end="")
for n in names:
eq = equities[n]
ny = len(eq) / 252
total = eq.iloc[-1] / INITIAL - 1
cagr = (1 + total) ** (1 / ny) - 1
print(f" | {cagr:>+7.1%} {'':>12}", end="")
print()
# Sharpe
print(f" {'Sharpe':<6}", end="")
for n in names:
eq = equities[n]
dr = eq.pct_change().dropna()
sh = dr.mean() / dr.std() * np.sqrt(252) if dr.std() > 0 else 0
print(f" | {sh:>7.2f} {'':>12}", end="")
print()
# Max DD
print(f" {'MaxDD':<6}", end="")
for n in names:
eq = equities[n]
rm = eq.cummax()
dd = ((eq - rm) / rm).min()
print(f" | {dd:>+7.1%} {'':>12}", end="")
print()
# Best/Worst year
print(f" {'Best':<6}", end="")
for n in names:
eq = equities[n]
dr = eq.pct_change().fillna(0)
yr_rets = {y: float((1 + dr[dr.index.year == y]).prod() - 1) for y in all_years}
# skip warmup year
active = {y: r for y, r in yr_rets.items() if abs(r) > 0.001}
if active:
best_y = max(active, key=active.get)
print(f" | {active[best_y]:>+7.1%} ({best_y}) ", end="")
else:
print(f" | {'':>24}", end="")
print()
print(f" {'Worst':<6}", end="")
for n in names:
eq = equities[n]
dr = eq.pct_change().fillna(0)
yr_rets = {y: float((1 + dr[dr.index.year == y]).prod() - 1) for y in all_years}
active = {y: r for y, r in yr_rets.items() if abs(r) > 0.001}
if active:
worst_y = min(active, key=active.get)
print(f" | {active[worst_y]:>+7.1%} ({worst_y}) ", end="")
else:
print(f" | {'':>24}", end="")
print()
def main():
# ===== US =====
prices_us = data_manager.load("us")
bench_us = prices_us["SPY"].dropna()
stocks_us = prices_us.drop(columns=["SPY"], errors="ignore")
eq_spy = bench_us / bench_us.iloc[0] * INITIAL
us_strats = {
"rec_mfilt+deep×upvol": combo([
(f_rec_mom_filtered, 0.5),
(combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), 0.5),
]),
"ma200+mom7m+rec126": combo([
(f_above_ma200, 0.33), (f_mom_intermediate, 0.33), (f_rec_126, 0.34)
]),
"rec_mfilt+ma200": combo([
(f_rec_mom_filtered, 0.5), (f_above_ma200, 0.5)
]),
"mom7m+rec126": combo([
(f_mom_intermediate, 0.5), (f_rec_126, 0.5)
]),
"BASELINE:rec+mom": f_rec_mom,
}
us_equities = {}
for name, func in us_strats.items():
us_equities[name] = run_equity(func, stocks_us)
us_equities["SPY (Benchmark)"] = eq_spy
yearly_table(us_equities, "US MARKET — Champion Strategies vs SPY — $10,000 Starting Capital")
# ===== CN =====
prices_cn = data_manager.load("cn")
bench_cn = prices_cn["000300.SS"].dropna() if "000300.SS" in prices_cn.columns else None
stocks_cn = prices_cn.drop(columns=["000300.SS"], errors="ignore")
cn_strats = {
"up_cap+quality_mom": combo([
(f_up_capture, 0.5), (f_quality_mom, 0.5)
]),
"down_resil+qual_mom": combo([
(f_down_resilience, 0.5), (f_quality_mom, 0.5)
]),
"rec63+mom×gap": combo([
(f_rec_63, 0.5), (f_mom_x_gap, 0.5)
]),
"up_cap+mom×gap": combo([
(f_up_capture, 0.5), (f_mom_x_gap, 0.5)
]),
"BASELINE:rec+mom": f_rec_mom,
}
cn_equities = {}
for name, func in cn_strats.items():
cn_equities[name] = run_equity(func, stocks_cn)
if bench_cn is not None:
cn_equities["CSI300 (Benchmark)"] = bench_cn / bench_cn.iloc[0] * INITIAL
yearly_table(cn_equities, "CN MARKET — Champion Strategies vs CSI 300 — $10,000 Starting Capital")
if __name__ == "__main__":
main()

218
strategies/factor_combo.py Normal file
View File

@@ -0,0 +1,218 @@
"""
Factor combination strategies discovered through iterative factor research.
US champions:
- rec_mfilt+deep×upvol: Recovery (momentum-filtered) + deep recovery × up-volume
- ma200+mom7m+rec126: Above MA200 + intermediate momentum + deep recovery
- rec_mfilt+ma200: Recovery (momentum-filtered) + above MA200
- mom7m+rec126: Intermediate momentum + deep recovery
CN champions:
- up_cap+quality_mom: Up-capture ratio + quality momentum composite
- down_resil+qual_mom: Down-resilience + quality momentum composite
- rec63+mom×gap: Recovery 63d + momentum × gap-up frequency
- up_cap+mom×gap: Up-capture + momentum × gap-up frequency
Each can run at daily/weekly/biweekly/monthly rebalancing frequency.
"""
import numpy as np
import pandas as pd
from strategies.base import Strategy
# ---------------------------------------------------------------------------
# Factor building blocks
# ---------------------------------------------------------------------------
def _mom_12_1(p):
return p.shift(21).pct_change(231)
def _mom_intermediate(p):
return p.shift(21).pct_change(147)
def _rec_63(p):
return p / p.rolling(63, min_periods=63).min() - 1
def _rec_126(p):
return p / p.rolling(126, min_periods=126).min() - 1
def _above_ma200(p):
return p / p.rolling(200, min_periods=200).mean() - 1
def _up_volume_proxy(p):
ret = p.pct_change()
return ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
def _gap_up_freq(p):
ret = p.pct_change()
return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean()
def _consistent_returns(p):
ret = p.pct_change()
return (ret > 0).astype(float).rolling(252, min_periods=126).mean()
def _rec_mom_filtered(p):
rec = p / p.rolling(126, min_periods=126).min() - 1
mom = p.shift(21).pct_change(105)
return rec.where(mom > 0, np.nan)
def _up_capture(p):
ret = p.pct_change()
mkt = ret.mean(axis=1)
up_mkt = mkt > 0
arr = ret.values.copy()
arr[~up_mkt.values, :] = np.nan
stock_up = pd.DataFrame(arr, index=ret.index, columns=ret.columns)
mkt_up_vals = mkt.where(up_mkt, np.nan)
stock_avg = stock_up.rolling(60, min_periods=20).mean()
mkt_avg = mkt_up_vals.rolling(60, min_periods=20).mean()
return stock_avg.div(mkt_avg, axis=0)
def _down_resilience(p):
ret = p.pct_change()
mkt = ret.mean(axis=1)
down_mkt = mkt < 0
arr = ret.values.copy()
arr[~down_mkt.values, :] = np.nan
down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns)
return -down_ret.rolling(120, min_periods=30).mean()
def _quality_mom(p):
mom_r = _mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
con_r = _consistent_returns(p).rank(axis=1, pct=True, na_option="keep")
up_r = _up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep")
return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r
def _mom_x_gap(p):
mom_r = _mom_12_1(p).rank(axis=1, pct=True, na_option="keep")
gap_r = _gap_up_freq(p).rank(axis=1, pct=True, na_option="keep")
return mom_r * gap_r
# ---------------------------------------------------------------------------
# Combo signal constructors (weighted rank sums)
# ---------------------------------------------------------------------------
def _rank(df):
return df.rank(axis=1, pct=True, na_option="keep")
# US combos
def signal_rec_mfilt_deep_upvol(p):
rec_mfilt_r = _rank(_rec_mom_filtered(p))
deep_upvol_r = _rank(_rec_126(p)) * _rank(_up_volume_proxy(p))
deep_upvol_rr = _rank(deep_upvol_r)
return 0.5 * rec_mfilt_r + 0.5 * deep_upvol_rr
def signal_ma200_mom7m_rec126(p):
return (0.33 * _rank(_above_ma200(p))
+ 0.33 * _rank(_mom_intermediate(p))
+ 0.34 * _rank(_rec_126(p)))
def signal_rec_mfilt_ma200(p):
return 0.5 * _rank(_rec_mom_filtered(p)) + 0.5 * _rank(_above_ma200(p))
def signal_mom7m_rec126(p):
return 0.5 * _rank(_mom_intermediate(p)) + 0.5 * _rank(_rec_126(p))
# CN combos
def signal_up_cap_quality_mom(p):
return 0.5 * _rank(_up_capture(p)) + 0.5 * _rank(_quality_mom(p))
def signal_down_resil_qual_mom(p):
return 0.5 * _rank(_down_resilience(p)) + 0.5 * _rank(_quality_mom(p))
def signal_rec63_mom_gap(p):
return 0.5 * _rank(_rec_63(p)) + 0.5 * _rank(_mom_x_gap(p))
def signal_up_cap_mom_gap(p):
return 0.5 * _rank(_up_capture(p)) + 0.5 * _rank(_mom_x_gap(p))
# ---------------------------------------------------------------------------
# Signal registry: name -> callable(prices) -> DataFrame
# ---------------------------------------------------------------------------
SIGNAL_REGISTRY = {
# US
"rec_mfilt+deep_upvol": signal_rec_mfilt_deep_upvol,
"ma200+mom7m+rec126": signal_ma200_mom7m_rec126,
"rec_mfilt+ma200": signal_rec_mfilt_ma200,
"mom7m+rec126": signal_mom7m_rec126,
# CN
"up_cap+quality_mom": signal_up_cap_quality_mom,
"down_resil+qual_mom": signal_down_resil_qual_mom,
"rec63+mom_gap": signal_rec63_mom_gap,
"up_cap+mom_gap": signal_up_cap_mom_gap,
}
# ---------------------------------------------------------------------------
# Strategy class
# ---------------------------------------------------------------------------
class FactorComboStrategy(Strategy):
"""
Generic factor-combination strategy with configurable rebalancing frequency.
Parameters:
signal_name: key into SIGNAL_REGISTRY
rebal_freq: rebalancing interval in trading days (1=daily, 5=weekly, 10=biweekly, 21=monthly)
top_n: number of stocks to hold
"""
REBAL_LABELS = {1: "daily", 5: "weekly", 10: "biweekly", 21: "monthly"}
def __init__(self, signal_name: str, rebal_freq: int = 21, top_n: int = 10):
if signal_name not in SIGNAL_REGISTRY:
raise ValueError(f"Unknown signal: {signal_name}. "
f"Available: {list(SIGNAL_REGISTRY.keys())}")
self.signal_name = signal_name
self.signal_func = SIGNAL_REGISTRY[signal_name]
self.rebal_freq = rebal_freq
self.top_n = top_n
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
sig = self.signal_func(data)
# Select top_n by signal rank
rank = sig.rank(axis=1, ascending=False, na_option="bottom")
n_valid = sig.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
signals = raw.div(row_sums, axis=0).fillna(0.0)
# Rebalance at configured frequency
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_indices] = True
signals[~rebal_mask] = np.nan
signals = signals.ffill().fillna(0.0)
signals.iloc[:warmup] = 0.0
return signals.shift(1).fillna(0.0)

View File

@@ -40,6 +40,7 @@ import yfinance as yf
import data_manager
from strategies.buy_and_hold import BuyAndHoldStrategy
from strategies.dual_momentum import DualMomentumStrategy
from strategies.factor_combo import FactorComboStrategy
from strategies.inverse_vol import InverseVolatilityStrategy
from strategies.momentum import MomentumStrategy
from strategies.momentum_quality import MomentumQualityStrategy
@@ -52,6 +53,7 @@ from universe import UNIVERSES
# ---------------------------------------------------------------------------
STRATEGY_REGISTRY = {
# --- Original strategies ---
"recovery_mom_top10": lambda **kw: RecoveryMomentumStrategy(top_n=10),
"recovery_mom_top20": lambda **kw: RecoveryMomentumStrategy(top_n=20),
"recovery_mom_top50": lambda **kw: RecoveryMomentumStrategy(top_n=50),
@@ -61,6 +63,40 @@ STRATEGY_REGISTRY = {
"inverse_vol": lambda **kw: InverseVolatilityStrategy(vol_window=20),
"trend_following": lambda **kw: TrendFollowingStrategy(top_n=kw.get("top_n", 20)),
"buy_and_hold": lambda **kw: BuyAndHoldStrategy(),
# --- Factor combo: US champions ---
"fc_rec_mfilt_deep_upvol_daily": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=1),
"fc_rec_mfilt_deep_upvol_weekly": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=5),
"fc_rec_mfilt_deep_upvol_biweekly": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=10),
"fc_rec_mfilt_deep_upvol_monthly": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=21),
"fc_ma200_mom7m_rec126_daily": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=1),
"fc_ma200_mom7m_rec126_weekly": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=5),
"fc_ma200_mom7m_rec126_biweekly": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=10),
"fc_ma200_mom7m_rec126_monthly": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=21),
"fc_rec_mfilt_ma200_daily": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=1),
"fc_rec_mfilt_ma200_weekly": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=5),
"fc_rec_mfilt_ma200_biweekly": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=10),
"fc_rec_mfilt_ma200_monthly": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=21),
"fc_mom7m_rec126_daily": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=1),
"fc_mom7m_rec126_weekly": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=5),
"fc_mom7m_rec126_biweekly": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=10),
"fc_mom7m_rec126_monthly": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=21),
# --- Factor combo: CN champions ---
"fc_up_cap_quality_mom_daily": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=1),
"fc_up_cap_quality_mom_weekly": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=5),
"fc_up_cap_quality_mom_biweekly": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=10),
"fc_up_cap_quality_mom_monthly": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=21),
"fc_down_resil_qual_mom_daily": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=1),
"fc_down_resil_qual_mom_weekly": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=5),
"fc_down_resil_qual_mom_biweekly": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=10),
"fc_down_resil_qual_mom_monthly": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=21),
"fc_rec63_mom_gap_daily": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=1),
"fc_rec63_mom_gap_weekly": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=5),
"fc_rec63_mom_gap_biweekly": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=10),
"fc_rec63_mom_gap_monthly": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=21),
"fc_up_cap_mom_gap_daily": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=1),
"fc_up_cap_mom_gap_weekly": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=5),
"fc_up_cap_mom_gap_biweekly": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=10),
"fc_up_cap_mom_gap_monthly": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=21),
}