From ae25f2f6b597cb0b048889de9775bef93938457f Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Wed, 8 Apr 2026 10:41:34 +0800 Subject: [PATCH] Add 32 factor-combo strategies with configurable rebalancing frequency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New FactorComboStrategy class (strategies/factor_combo.py) implements 8 champion factor signals (4 US, 4 CN) discovered through iterative factor research, each at 4 rebalancing frequencies (daily/weekly/ biweekly/monthly). Registered in trader.py as fc_{signal}_{freq}. Existing strategies and state files are untouched — safe to git pull and restart monitor on server. Also includes factor research scripts (factor_loop.py, factor_research.py, etc.) used to discover and validate these factors. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 8 + CLAUDE.md | 3 +- factor_backtest.py | 213 ++++++++++++ factor_deep_analysis.py | 324 ++++++++++++++++++ factor_final_check.py | 150 +++++++++ factor_loop.py | 654 +++++++++++++++++++++++++++++++++++++ factor_real_backtest.py | 449 +++++++++++++++++++++++++ factor_research.py | 547 +++++++++++++++++++++++++++++++ factor_robustness.py | 323 ++++++++++++++++++ factor_yearly_fresh.py | 259 +++++++++++++++ factor_yearly_report.py | 219 +++++++++++++ strategies/factor_combo.py | 218 +++++++++++++ trader.py | 36 ++ 13 files changed, 3402 insertions(+), 1 deletion(-) create mode 100644 factor_backtest.py create mode 100644 factor_deep_analysis.py create mode 100644 factor_final_check.py create mode 100644 factor_loop.py create mode 100644 factor_real_backtest.py create mode 100644 factor_research.py create mode 100644 factor_robustness.py create mode 100644 factor_yearly_fresh.py create mode 100644 factor_yearly_report.py create mode 100644 strategies/factor_combo.py diff --git a/.gitignore b/.gitignore index 83650a1..273e00b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,14 @@ data/universe_*.json # Trader state — per-machine, regenerated by auto/simulate data/trader_*.json +# Factor attribution output and cached factors +data/attribution_*/ +data/factors/ +data/factors_review_tmp/ + +# External tool artifacts +docs/superpowers/ + # IDE / editor .idea/ .vscode/ diff --git a/CLAUDE.md b/CLAUDE.md index b220474..d753ade 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,7 +44,7 @@ No test suite or linter is configured. **Backtest engine** (`main.py`): Orchestrates data loading, strategy execution, and visualization. The `backtest()` function is vectorized — it takes a strategy and price DataFrame, applies transaction costs (proportional + optional fixed per-trade fee) via turnover, and returns an equity curve. Supports two execution modes: `close` (classic) and `open-close` (signal on open prices, execute at close). -**Daily trader** (`trader.py`): Live/forward-testing system with persistent portfolio state in `data/trader_{market}_{strategy}.json`. The `auto` subcommand runs both signal generation and execution in a single invocation — designed for cron. The `simulate` subcommand replays a date range day-by-day with realistic portfolio tracking (fractional shares, cash, commissions). Available strategies: `recovery_mom_top10`, `recovery_mom_top20`, `momentum`, `momentum_quality`, `dual_momentum`, `inverse_vol`, `trend_following`, `buy_and_hold`. +**Daily trader** (`trader.py`): Live/forward-testing system with persistent portfolio state in `data/trader_{market}_{strategy}.json`. The `auto` subcommand runs both signal generation and execution in a single invocation — designed for cron. The `simulate` subcommand replays a date range day-by-day with realistic portfolio tracking (fractional shares, cash, commissions). Available strategies: `recovery_mom_top10`, `recovery_mom_top20`, `momentum`, `momentum_quality`, `dual_momentum`, `inverse_vol`, `trend_following`, `buy_and_hold`, plus 32 factor-combo strategies (`fc_{signal}_{freq}` — see `strategies/factor_combo.py`). **Strategy protocol** (`strategies/base.py`): All strategies inherit from `Strategy` ABC and implement `generate_signals(data) → DataFrame` where the returned DataFrame contains portfolio weights (rows = dates, columns = assets, values sum to ~1.0 per row). Each strategy is responsible for applying its own 1-day lag via `.shift(1)` to avoid lookahead bias — the backtest engine does not shift. @@ -59,6 +59,7 @@ No test suite or linter is configured. - `momentum_quality.py` — Momentum + return consistency + low drawdown - `adaptive_momentum.py` — Momentum weighted by inverse volatility - `recovery_momentum.py` — Recovery (price/63d low) + 12-1mo momentum composite. Best US performer. +- `factor_combo.py` — Configurable factor-combination strategies with daily/weekly/biweekly/monthly rebalancing. US champions: `rec_mfilt+deep_upvol` (50.7% CAGR monthly), `ma200+mom7m+rec126`, `rec_mfilt+ma200`, `mom7m+rec126`. CN champions: `up_cap+quality_mom` (26.1% CAGR monthly), `down_resil+qual_mom`, `rec63+mom_gap`, `up_cap+mom_gap`. All registered in trader.py as `fc_{signal}_{freq}` (e.g., `fc_rec_mfilt_deep_upvol_monthly`). 32 new strategies total. **Metrics** (`metrics.py`): Standalone functions for portfolio analytics (Sharpe, Sortino, Calmar, max drawdown, etc.). `summary()` prints a formatted report and returns a dict. diff --git a/factor_backtest.py b/factor_backtest.py new file mode 100644 index 0000000..897a3a7 --- /dev/null +++ b/factor_backtest.py @@ -0,0 +1,213 @@ +""" +Backtest best factor combinations with yearly return breakdown. + +US best: momentum + recovery + low_downside_beta + short_term_reversal +CN best: momentum + anti_lottery + vol_reversal +""" + +from __future__ import annotations + +import argparse + +import numpy as np +import pandas as pd + +import data_manager +import metrics +from universe import UNIVERSES +from factor_research import ( + factor_momentum_12_1, + factor_recovery, + factor_short_term_reversal, + factor_downside_beta_proxy, + factor_lottery_demand, + factor_turnover_reversal, + factor_52w_high_distance, +) + + +def build_strategy_signals( + prices: pd.DataFrame, + factor_funcs: list, + weights: list[float], + top_n: int = 10, + rebal_freq: int = 21, +) -> pd.DataFrame: + """Build equal-weight top-N strategy from ranked factor combination.""" + signals_list = [f(prices) for f in factor_funcs] + ranked = [s.rank(axis=1, pct=True, na_option="keep") for s in signals_list] + composite = sum(w * r for w, r in zip(weights, ranked)) + + # Warmup: need at least 252 days + warmup = 252 + + rank = composite.rank(axis=1, ascending=False, na_option="bottom") + n_valid = composite.notna().sum(axis=1) + enough = n_valid >= top_n + top_mask = (rank <= top_n) & enough.values.reshape(-1, 1) + + raw = top_mask.astype(float) + row_sums = raw.sum(axis=1).replace(0, np.nan) + signals = raw.div(row_sums, axis=0).fillna(0.0) + + # Monthly rebalance + rebal_mask = pd.Series(False, index=prices.index) + rebal_indices = list(range(warmup, len(prices), rebal_freq)) + rebal_mask.iloc[rebal_indices] = True + signals[~rebal_mask] = np.nan + signals = signals.ffill().fillna(0.0) + signals.iloc[:warmup] = 0.0 + + return signals.shift(1).fillna(0.0) + + +def backtest_equity(signals: pd.DataFrame, prices: pd.DataFrame, cost: float = 0.001) -> pd.Series: + """Simple vectorized backtest returning equity curve.""" + returns = prices.pct_change().fillna(0.0) + port_ret = (signals * returns).sum(axis=1) + + # Transaction costs via turnover + turnover = signals.diff().abs().sum(axis=1) + port_ret -= turnover * cost + + equity = (1 + port_ret).cumprod() * 100000 + return equity + + +def yearly_returns(equity: pd.Series) -> pd.DataFrame: + """Compute calendar year returns from equity curve.""" + daily_ret = equity.pct_change().fillna(0) + years = daily_ret.index.year + rows = [] + for year in sorted(years.unique()): + mask = years == year + yr_ret = (1 + daily_ret[mask]).prod() - 1 + # Also compute max drawdown for the year + eq_yr = equity[mask] + running_max = eq_yr.cummax() + dd = (eq_yr - running_max) / running_max + rows.append({ + "year": year, + "return": yr_ret, + "max_dd": dd.min(), + "start_val": float(eq_yr.iloc[0]), + "end_val": float(eq_yr.iloc[-1]), + }) + return pd.DataFrame(rows).set_index("year") + + +def run(market: str, years_list: list[int]): + config = UNIVERSES[market] + benchmark = config["benchmark"] + + print(f"Loading {market.upper()} price data...") + prices = data_manager.load(market) + bench_prices = prices[benchmark] if benchmark in prices.columns else None + stocks = prices.drop(columns=[benchmark], errors="ignore") + + if market == "us": + label = "Mom+Recovery+LowDBeta+STR" + factor_funcs = [factor_momentum_12_1, factor_recovery, factor_downside_beta_proxy, factor_short_term_reversal] + weights = [0.25, 0.25, 0.25, 0.25] + baseline_label = "Recovery+Mom (baseline)" + baseline_funcs = [factor_momentum_12_1, factor_recovery] + baseline_weights = [0.5, 0.5] + else: + label = "Mom+Near52wHigh+VolReversal" + factor_funcs = [factor_momentum_12_1, factor_52w_high_distance, factor_turnover_reversal] + weights = [0.40, 0.30, 0.30] + baseline_label = "Mom+Recovery (baseline)" + baseline_funcs = [factor_momentum_12_1, factor_recovery] + baseline_weights = [0.5, 0.5] + + for top_n in [10]: + print(f"\n{'='*90}") + print(f" {market.upper()} — Top {top_n} — {label}") + print(f"{'='*90}") + + # Best combo + sig = build_strategy_signals(stocks, factor_funcs, weights, top_n=top_n) + eq = backtest_equity(sig, stocks) + + # Baseline + sig_base = build_strategy_signals(stocks, baseline_funcs, baseline_weights, top_n=top_n) + eq_base = backtest_equity(sig_base, stocks) + + # Benchmark + if bench_prices is not None: + bp = bench_prices.dropna() + eq_bench = bp / bp.iloc[0] * 100000 + + for n_years in years_list: + cutoff = stocks.index[-1] - pd.DateOffset(years=n_years) + eq_slice = eq[eq.index >= cutoff] + eq_base_slice = eq_base[eq_base.index >= cutoff] + + if len(eq_slice) < 50: + continue + + # Normalize to starting capital + eq_norm = eq_slice / eq_slice.iloc[0] * 100000 + eq_base_norm = eq_base_slice / eq_base_slice.iloc[0] * 100000 + + yr = yearly_returns(eq_norm) + yr_base = yearly_returns(eq_base_norm) + + if bench_prices is not None: + eq_bench_slice = eq_bench[eq_bench.index >= cutoff] + eq_bench_norm = eq_bench_slice / eq_bench_slice.iloc[0] * 100000 + yr_bench = yearly_returns(eq_bench_norm) + + print(f"\n--- Last {n_years} Years (from {eq_slice.index[0].date()}) ---\n") + + # Combined table + print(f" {'Year':<6} | {label:>30} | {baseline_label:>25} | {'Benchmark':>12} | {'Alpha vs Bench':>14}") + print(f" {'-'*6}-+-{'-'*30}-+-{'-'*25}-+-{'-'*12}-+-{'-'*14}") + + all_years = sorted(yr.index.tolist()) + total_new = 1.0 + total_base = 1.0 + total_bench = 1.0 + + for y in all_years: + r_new = yr.loc[y, "return"] if y in yr.index else 0 + dd_new = yr.loc[y, "max_dd"] if y in yr.index else 0 + r_base = yr_base.loc[y, "return"] if y in yr_base.index else 0 + r_bench = yr_bench.loc[y, "return"] if bench_prices is not None and y in yr_bench.index else 0 + alpha = r_new - r_bench + + total_new *= (1 + r_new) + total_base *= (1 + r_base) + total_bench *= (1 + r_bench) + + print(f" {y:<6} | {r_new:>+14.2%} (dd {dd_new:>+7.2%}) | {r_base:>+25.2%} | {r_bench:>+12.2%} | {alpha:>+14.2%}") + + total_r_new = total_new - 1 + total_r_base = total_base - 1 + total_r_bench = total_bench - 1 + cagr_new = (total_new ** (1 / n_years)) - 1 + cagr_base = (total_base ** (1 / n_years)) - 1 + cagr_bench = (total_bench ** (1 / n_years)) - 1 + + print(f" {'-'*6}-+-{'-'*30}-+-{'-'*25}-+-{'-'*12}-+-{'-'*14}") + print(f" {'Total':<6} | {total_r_new:>+14.2%}{' '*16} | {total_r_base:>+25.2%} | {total_r_bench:>+12.2%} |") + print(f" {'CAGR':<6} | {cagr_new:>+14.2%}{' '*16} | {cagr_base:>+25.2%} | {cagr_bench:>+12.2%} |") + + # Full period metrics + print(f"\n Full metrics ({label}):") + daily_ret = eq_norm.pct_change().dropna() + sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0 + running_max = eq_norm.cummax() + max_dd = ((eq_norm - running_max) / running_max).min() + print(f" Sharpe: {sharpe:.2f} | Max Drawdown: {max_dd:.2%} | Win Rate: {(daily_ret > 0).mean():.2%}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--market", default="us", choices=["us", "cn"]) + args = parser.parse_args() + run(args.market, years_list=[3, 5, 10]) + + +if __name__ == "__main__": + main() diff --git a/factor_deep_analysis.py b/factor_deep_analysis.py new file mode 100644 index 0000000..3c6c6a7 --- /dev/null +++ b/factor_deep_analysis.py @@ -0,0 +1,324 @@ +""" +Deep factor analysis — orthogonality, proper correlations, residual alpha. + +For the top factor candidates identified in factor_research.py, this script: +1. Computes proper daily cross-sectional rank correlations between factors +2. Tests residual IC after neutralizing known factors (momentum, recovery) +3. Runs sub-period breakdown (2-year windows) +4. Tests factor combinations +""" + +from __future__ import annotations + +import argparse +import warnings + +import numpy as np +import pandas as pd + +import data_manager +from universe import UNIVERSES +from factor_research import ( + factor_momentum_12_1, + factor_recovery, + factor_inverse_vol, + factor_short_term_reversal, + factor_idio_vol_change, + factor_max_drawdown_recovery, + factor_mean_reversion_residual, + factor_skewness, + factor_high_low_range as factor_range_compression, + factor_52w_high_distance as factor_near_52w_high, + factor_downside_beta_proxy as factor_low_downside_beta, + factor_lottery_demand, + factor_turnover_reversal, + factor_gap_momentum as factor_smooth_momentum, + factor_up_down_vol_ratio, + factor_trend_strength, + factor_consecutive_up_days, + factor_volume_price_divergence, + factor_recovery_acceleration, + factor_relative_volume_momentum, + factor_price_level, + factor_liquidity_premium, + compute_ic, +) + +warnings.filterwarnings("ignore", category=FutureWarning) + + +def daily_cross_sectional_correlation( + sig_a: pd.DataFrame, sig_b: pd.DataFrame +) -> pd.Series: + """Daily cross-sectional Spearman correlation between two factor signals.""" + common_idx = sig_a.index.intersection(sig_b.index) + common_cols = sig_a.columns.intersection(sig_b.columns) + a = sig_a.loc[common_idx, common_cols] + b = sig_b.loc[common_idx, common_cols] + + corrs = {} + for date in common_idx: + va = a.loc[date].dropna() + vb = b.loc[date].dropna() + common = va.index.intersection(vb.index) + if len(common) < 30: + continue + c = va[common].corr(vb[common], method="spearman") + if np.isfinite(c): + corrs[date] = c + return pd.Series(corrs) + + +def proper_factor_correlation_matrix(factors: dict[str, pd.DataFrame]) -> pd.DataFrame: + """Compute average daily cross-sectional Spearman correlations.""" + names = list(factors.keys()) + n = len(names) + matrix = pd.DataFrame(1.0, index=names, columns=names) + + for i in range(n): + for j in range(i + 1, n): + corr_series = daily_cross_sectional_correlation(factors[names[i]], factors[names[j]]) + avg_corr = corr_series.mean() if len(corr_series) > 0 else np.nan + matrix.loc[names[i], names[j]] = avg_corr + matrix.loc[names[j], names[i]] = avg_corr + + return matrix + + +def residual_signal( + target: pd.DataFrame, + controls: list[pd.DataFrame], +) -> pd.DataFrame: + """Cross-sectionally orthogonalize target signal against control signals. + For each day, regress target ranks on control ranks, return residual.""" + ranked_target = target.rank(axis=1, pct=True, na_option="keep") + ranked_controls = [c.rank(axis=1, pct=True, na_option="keep") for c in controls] + + residuals = pd.DataFrame(index=target.index, columns=target.columns, dtype=float) + + for date in target.index: + y = ranked_target.loc[date].dropna() + xs = [rc.loc[date].reindex(y.index) for rc in ranked_controls if date in rc.index] + if not xs: + residuals.loc[date] = y + continue + + x_df = pd.concat(xs, axis=1).dropna() + common = y.index.intersection(x_df.index) + if len(common) < 30: + continue + + y_c = y[common].values + x_c = x_df.loc[common].values + x_c = np.column_stack([np.ones(len(common)), x_c]) + + try: + coef, _, _, _ = np.linalg.lstsq(x_c, y_c, rcond=None) + resid = y_c - x_c @ coef + residuals.loc[date, common] = resid + except np.linalg.LinAlgError: + residuals.loc[date, common] = y[common].values + + return residuals + + +def subperiod_ic(signal: pd.DataFrame, prices: pd.DataFrame, horizon: int = 5, window_years: int = 2): + """Compute IC for each rolling sub-period.""" + fwd_ret = prices.pct_change(horizon).shift(-horizon) + ic_series = compute_ic(signal, fwd_ret) + if len(ic_series) == 0: + return pd.DataFrame() + + window = 252 * window_years + results = [] + start = ic_series.index[0] + while start < ic_series.index[-1]: + end = start + pd.DateOffset(years=window_years) + subset = ic_series[(ic_series.index >= start) & (ic_series.index < end)] + if len(subset) > 100: + results.append({ + "period": f"{start.year}-{end.year}", + "ic_mean": subset.mean(), + "ic_std": subset.std(), + "icir": subset.mean() / subset.std() if subset.std() > 0 else 0, + "pct_positive": (subset > 0).mean(), + "n_days": len(subset), + }) + start = end + return pd.DataFrame(results) + + +def test_factor_combination( + factors: dict[str, pd.DataFrame], + factor_names: list[str], + weights: list[float], + prices: pd.DataFrame, + label: str, +): + """Test a weighted combination of factors.""" + ranked = [factors[n].rank(axis=1, pct=True, na_option="keep") for n in factor_names] + combo = sum(w * r for w, r in zip(weights, ranked)) + + fwd_5d = prices.pct_change(5).shift(-5) + ic_series = compute_ic(combo, fwd_5d) + if len(ic_series) == 0: + return None + + return { + "combo": label, + "ic_5d": ic_series.mean(), + "icir_5d": ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0, + "ic_stab": (ic_series.rolling(252).mean().dropna() > 0).mean() if len(ic_series) > 252 else np.nan, + } + + +def run_analysis(market: str): + config = UNIVERSES[market] + benchmark = config["benchmark"] + + print(f"Loading {market.upper()} price data...") + prices = data_manager.load(market) + stocks = prices.drop(columns=[benchmark], errors="ignore") + print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} days") + + # Build factors + print("Computing factors...") + factors = {} + factors["momentum_12_1"] = factor_momentum_12_1(stocks) + factors["recovery"] = factor_recovery(stocks) + factors["inverse_vol"] = factor_inverse_vol(stocks) + factors["short_term_reversal"] = factor_short_term_reversal(stocks) + factors["drawdown_recovery"] = factor_max_drawdown_recovery(stocks) + factors["mean_rev_zscore"] = factor_mean_reversion_residual(stocks) + factors["neg_skewness"] = factor_skewness(stocks) + factors["near_52w_high"] = factor_near_52w_high(stocks) + factors["low_downside_beta"] = factor_low_downside_beta(stocks) + factors["smooth_momentum"] = factor_smooth_momentum(stocks) + factors["recovery_accel"] = factor_recovery_acceleration(stocks) + factors["range_compression"] = factor_range_compression(stocks) + + if market == "cn": + factors["anti_lottery"] = factor_lottery_demand(stocks) + factors["vol_reversal"] = factor_turnover_reversal(stocks) + factors["low_price"] = factor_price_level(stocks) + factors["illiquidity"] = factor_liquidity_premium(stocks) + + # ---- 1. Proper Cross-Sectional Correlation Matrix ---- + print("\n" + "=" * 90) + print(f" 1. CROSS-SECTIONAL FACTOR CORRELATIONS — {market.upper()}") + print("=" * 90) + print("(Average daily Spearman correlation between factor ranks)\n") + + corr = proper_factor_correlation_matrix(factors) + print(corr.round(3).to_string()) + + # ---- 2. Residual IC after neutralizing known factors ---- + print("\n" + "=" * 90) + print(f" 2. RESIDUAL IC AFTER NEUTRALIZING KNOWN FACTORS — {market.upper()}") + print("=" * 90) + print("(IC of factor after cross-sectionally regressing out momentum + recovery)\n") + + known = [factors["momentum_12_1"], factors["recovery"]] + fwd_5d = stocks.pct_change(5).shift(-5) + + new_candidates = [k for k in factors if k not in ("momentum_12_1", "recovery", "inverse_vol")] + rows = [] + for name in new_candidates: + resid = residual_signal(factors[name], known) + ic_series = compute_ic(resid, fwd_5d) + if len(ic_series) > 0: + rows.append({ + "factor": name, + "raw_ic_5d": compute_ic(factors[name], fwd_5d).mean(), + "residual_ic_5d": ic_series.mean(), + "residual_icir_5d": ic_series.mean() / ic_series.std() if ic_series.std() > 0 else 0, + "pct_pos": (ic_series > 0).mean(), + }) + + resid_df = pd.DataFrame(rows).set_index("factor").sort_values("residual_icir_5d", ascending=False) + print(resid_df.round(4).to_string()) + + # ---- 3. Sub-Period Stability ---- + print("\n" + "=" * 90) + print(f" 3. SUB-PERIOD IC STABILITY (2-year windows, 5-day horizon) — {market.upper()}") + print("=" * 90) + + # Test top factors + if market == "us": + top_factors = ["low_downside_beta", "drawdown_recovery", "mean_rev_zscore", "short_term_reversal", "momentum_12_1"] + else: + top_factors = ["momentum_12_1", "anti_lottery", "inverse_vol", "vol_reversal", "near_52w_high"] + + for name in top_factors: + if name not in factors: + continue + print(f"\n {name}:") + sp = subperiod_ic(factors[name], stocks, horizon=5) + if not sp.empty: + print(sp.to_string(index=False)) + else: + print(" (insufficient data)") + + # ---- 4. Factor Combinations ---- + print("\n" + "=" * 90) + print(f" 4. FACTOR COMBINATIONS — {market.upper()}") + print("=" * 90) + print("(Testing multi-factor composites)\n") + + combos = [] + if market == "us": + tests = [ + (["momentum_12_1", "low_downside_beta"], [0.5, 0.5], "mom+low_dbeta"), + (["momentum_12_1", "drawdown_recovery"], [0.5, 0.5], "mom+dd_recovery"), + (["momentum_12_1", "mean_rev_zscore"], [0.5, 0.5], "mom+mean_rev"), + (["momentum_12_1", "short_term_reversal"], [0.5, 0.5], "mom+STR"), + (["recovery", "low_downside_beta"], [0.5, 0.5], "recovery+low_dbeta"), + (["momentum_12_1", "recovery", "low_downside_beta"], [0.33, 0.33, 0.34], "mom+rec+low_dbeta"), + (["momentum_12_1", "recovery", "drawdown_recovery"], [0.33, 0.33, 0.34], "mom+rec+dd_rec"), + (["momentum_12_1", "recovery", "short_term_reversal"], [0.33, 0.33, 0.34], "mom+rec+STR"), + (["momentum_12_1", "recovery", "mean_rev_zscore"], [0.33, 0.33, 0.34], "mom+rec+meanrev"), + (["momentum_12_1", "recovery", "low_downside_beta", "short_term_reversal"], + [0.25, 0.25, 0.25, 0.25], "mom+rec+dbeta+STR"), + (["momentum_12_1", "recovery", "drawdown_recovery", "mean_rev_zscore"], + [0.25, 0.25, 0.25, 0.25], "mom+rec+ddrec+meanrev"), + ] + else: # cn + tests = [ + (["momentum_12_1", "anti_lottery"], [0.5, 0.5], "mom+anti_lottery"), + (["momentum_12_1", "inverse_vol"], [0.5, 0.5], "mom+inv_vol"), + (["momentum_12_1", "vol_reversal"], [0.5, 0.5], "mom+vol_reversal"), + (["momentum_12_1", "near_52w_high"], [0.5, 0.5], "mom+near52wh"), + (["momentum_12_1", "anti_lottery", "inverse_vol"], [0.33, 0.33, 0.34], "mom+alot+invvol"), + (["momentum_12_1", "anti_lottery", "vol_reversal"], [0.33, 0.33, 0.34], "mom+alot+volrev"), + (["momentum_12_1", "anti_lottery", "near_52w_high"], [0.33, 0.33, 0.34], "mom+alot+near52w"), + (["momentum_12_1", "recovery", "anti_lottery"], [0.33, 0.33, 0.34], "mom+rec+alot"), + (["momentum_12_1", "anti_lottery", "inverse_vol", "vol_reversal"], + [0.25, 0.25, 0.25, 0.25], "mom+alot+invvol+volrev"), + (["momentum_12_1", "anti_lottery", "near_52w_high", "vol_reversal"], + [0.25, 0.25, 0.25, 0.25], "mom+alot+52wh+volrev"), + ] + + # Also test the existing recovery+momentum baseline + baseline = test_factor_combination(factors, ["momentum_12_1", "recovery"], [0.5, 0.5], stocks, "BASELINE: mom+recovery") + if baseline: + combos.append(baseline) + + for names, weights, label in tests: + if all(n in factors for n in names): + result = test_factor_combination(factors, names, weights, stocks, label) + if result: + combos.append(result) + + combo_df = pd.DataFrame(combos).set_index("combo").sort_values("icir_5d", ascending=False) + print(combo_df.round(4).to_string()) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--market", default="us", choices=["us", "cn"]) + args = parser.parse_args() + run_analysis(args.market) + + +if __name__ == "__main__": + main() diff --git a/factor_final_check.py b/factor_final_check.py new file mode 100644 index 0000000..f34676a --- /dev/null +++ b/factor_final_check.py @@ -0,0 +1,150 @@ +"""Final robustness check on champions from the discovery loop.""" + +from __future__ import annotations +import warnings +import numpy as np +import pandas as pd +import data_manager +from universe import UNIVERSES +from factor_loop import ( + strat, bt, stats, combo, yearly, + f_rec_mom, f_rec_126, f_rec_63, + f_mom_12_1, f_mom_6_1, f_mom_intermediate, + f_above_ma200, f_golden_cross, + f_up_volume_proxy, f_gap_up_freq, + f_rec_mom_filtered, f_down_resilience, + f_up_capture, f_52w_high, f_str_10d, + f_earnings_drift, f_reversal_vol, +) + +warnings.filterwarnings("ignore") + + +def f_quality_mom(p): + mom = f_mom_12_1(p) + consist_ret = p.pct_change() + consist = (consist_ret > 0).astype(float).rolling(252, min_periods=126).mean() + mom_r = mom.rank(axis=1, pct=True, na_option="keep") + con_r = consist.rank(axis=1, pct=True, na_option="keep") + up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r + + +def f_mom_x_gap(p): + mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + gap_r = f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep") + return mom_r * gap_r + + +def rolling_2yr(eq): + dr = eq.pct_change().dropna() + results = [] + for end_i in range(504, len(dr), 63): + chunk = dr.iloc[end_i - 504:end_i] + tot = (1 + chunk).prod() - 1 + ann = (1 + tot) ** (252 / len(chunk)) - 1 + sh = chunk.mean() / chunk.std() * np.sqrt(252) if chunk.std() > 0 else 0 + results.append({"end": chunk.index[-1].date(), "ann": ann, "sh": sh}) + return pd.DataFrame(results) + + +def run_robustness(name, func, prices, label_prefix): + print(f"\n {name}:") + + # Top-N sensitivity + print(f" Top-N: ", end="") + for n in [5, 10, 15, 20]: + w = strat(prices, func, top_n=n) + eq = bt(w, prices) + s = stats(eq) + print(f"N={n}: {s['cagr']:+.1%}/{s['sharpe']:.2f} ", end="") + print() + + # Rebal sensitivity + print(f" Rebal: ", end="") + for r in [5, 10, 21, 42]: + w = strat(prices, func, top_n=10, rebal=r) + eq = bt(w, prices) + s = stats(eq) + print(f"{r}d: {s['cagr']:+.1%}/{s['sharpe']:.2f} ", end="") + print() + + # Cost sensitivity + print(f" Cost: ", end="") + for c in [0, 0.001, 0.002, 0.005]: + w = strat(prices, func, top_n=10) + eq = bt(w, prices, cost=c) + s = stats(eq) + print(f"{c*1e4:.0f}bp: {s['cagr']:+.1%} ", end="") + print() + + # Rolling 2-year + w = strat(prices, func, top_n=10) + eq = bt(w, prices) + roll = rolling_2yr(eq) + if not roll.empty: + pct_pos = (roll["ann"] > 0).mean() + print(f" 2yr rolling: mean={roll['ann'].mean():+.1%} min={roll['ann'].min():+.1%} " + f"max={roll['ann'].max():+.1%} %pos={pct_pos:.0%} mean_sharpe={roll['sh'].mean():.2f}") + + +def main(): + # ============= US ============= + prices_us = data_manager.load("us") + stocks_us = prices_us.drop(columns=["SPY"], errors="ignore") + + print("=" * 95) + print(" US FINAL ROBUSTNESS — Champions vs Baseline") + print("=" * 95) + + us_champs = [ + ("BASELINE: rec+mom", f_rec_mom), + ("rec_mom_filtered+rec_deep×upvol", + combo([(f_rec_mom_filtered, 0.5), + combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), (lambda x: x, 0.0)])), # hack + ("above_ma200+mom_7m+rec_126d", + combo([(f_above_ma200, 0.33), (f_mom_intermediate, 0.33), (f_rec_126, 0.34)])), + ("rec_mom_filtered+above_ma200", + combo([(f_rec_mom_filtered, 0.5), (f_above_ma200, 0.5)])), + ("mom_7m+rec_126d", + combo([(f_mom_intermediate, 0.5), (f_rec_126, 0.5)])), + ] + + # Fix the first champion - need proper 2-factor combo + us_champs[1] = ( + "rec_mom_filt + rec_deep×upvol", + combo([ + (f_rec_mom_filtered, 0.5), + (combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), 0.5), + ]) + ) + + for name, func in us_champs: + run_robustness(name, func, stocks_us, "US") + + # ============= CN ============= + prices_cn = data_manager.load("cn") + stocks_cn = prices_cn.drop(columns=["000300.SS"], errors="ignore") + + print(f"\n{'='*95}") + print(" CN FINAL ROBUSTNESS — Champions vs Baseline") + print("=" * 95) + + cn_champs = [ + ("BASELINE: rec+mom", f_rec_mom), + ("up_capture+quality_mom", + combo([(f_up_capture, 0.5), (f_quality_mom, 0.5)])), + ("recovery_63d+mom×gap", + combo([(f_rec_63, 0.5), (f_mom_x_gap, 0.5)])), + ("down_resilience+quality_mom", + combo([(f_down_resilience, 0.5), (f_quality_mom, 0.5)])), + ("up_capture+mom×gap", + combo([(f_up_capture, 0.5), (f_mom_x_gap, 0.5)])), + ] + + for name, func in cn_champs: + run_robustness(name, func, stocks_cn, "CN") + + +if __name__ == "__main__": + main() diff --git a/factor_loop.py b/factor_loop.py new file mode 100644 index 0000000..f3ea275 --- /dev/null +++ b/factor_loop.py @@ -0,0 +1,654 @@ +""" +Iterative Factor Discovery Loop. + +Round 1: Academic & practitioner hypotheses (30+ factors) +Round 2: Data-driven variations on Round 1 winners +Round 3: Interaction and conditional factors +Round 4: Parameter optimization on finalists +Round 5: Best combinations + +Each factor is tested immediately as a top-10 equal-weight strategy +with monthly rebalancing and 10bps transaction costs. +""" + +from __future__ import annotations + +import argparse +import warnings +from typing import Callable + +import numpy as np +import pandas as pd + +import data_manager +from universe import UNIVERSES + +warnings.filterwarnings("ignore") + +FactorFunc = Callable[[pd.DataFrame], pd.DataFrame] + + +# --------------------------------------------------------------------------- +# Backtest infrastructure +# --------------------------------------------------------------------------- + +def strat( + prices: pd.DataFrame, + signal_func: FactorFunc, + top_n: int = 10, + rebal: int = 21, + warmup: int = 252, +) -> pd.DataFrame: + sig = signal_func(prices) + rank = sig.rank(axis=1, ascending=False, na_option="bottom") + n_valid = sig.notna().sum(axis=1) + enough = n_valid >= top_n + mask = (rank <= top_n) & enough.values.reshape(-1, 1) + raw = mask.astype(float) + w = raw.div(raw.sum(axis=1).replace(0, np.nan), axis=0).fillna(0.0) + rmask = pd.Series(False, index=prices.index) + rmask.iloc[list(range(warmup, len(prices), rebal))] = True + w[~rmask] = np.nan + w = w.ffill().fillna(0.0) + w.iloc[:warmup] = 0.0 + return w.shift(1).fillna(0.0) + + +def bt(weights: pd.DataFrame, prices: pd.DataFrame, cost: float = 0.001) -> pd.Series: + ret = prices.pct_change().fillna(0.0) + pr = (weights * ret).sum(axis=1) + pr -= weights.diff().abs().sum(axis=1) * cost + return (1 + pr).cumprod() * 100000 + + +def stats(eq: pd.Series) -> dict: + dr = eq.pct_change().dropna() + if len(dr) < 200 or dr.std() == 0: + return {"cagr": np.nan, "sharpe": np.nan, "sortino": np.nan, + "maxdd": np.nan, "calmar": np.nan} + ny = len(dr) / 252 + tot = eq.iloc[-1] / eq.iloc[0] - 1 + cagr = (1 + tot) ** (1 / ny) - 1 + sh = dr.mean() / dr.std() * np.sqrt(252) + sd = dr[dr < 0].std() + so = dr.mean() / sd * np.sqrt(252) if sd > 0 else 0 + rm = eq.cummax() + dd = ((eq - rm) / rm).min() + cal = cagr / abs(dd) if dd != 0 else 0 + return {"cagr": cagr, "sharpe": sh, "sortino": so, "maxdd": dd, "calmar": cal} + + +def yearly(eq: pd.Series) -> dict[int, float]: + dr = eq.pct_change().fillna(0) + return {y: float((1 + dr[dr.index.year == y]).prod() - 1) for y in sorted(dr.index.year.unique())} + + +def test_factor(name: str, func: FactorFunc, prices: pd.DataFrame, + top_n: int = 10) -> dict: + w = strat(prices, func, top_n=top_n) + eq = bt(w, prices) + s = stats(eq) + s["name"] = name + s["equity"] = eq + return s + + +def combo(fws: list[tuple[FactorFunc, float]]) -> FactorFunc: + def _c(p): + return sum(w * f(p).rank(axis=1, pct=True, na_option="keep") for f, w in fws) + return _c + + +def print_results(results: list[dict], title: str): + df = pd.DataFrame([{k: v for k, v in r.items() if k != "equity"} for r in results]) + df = df.set_index("name").sort_values("cagr", ascending=False) + print(f"\n{'='*95}") + print(f" {title}") + print(f"{'='*95}") + print(f" {'Factor':<45} {'CAGR':>7} {'Sharpe':>7} {'Sortino':>8} {'MaxDD':>7} {'Calmar':>7}") + print(f" {'-'*85}") + for name, row in df.iterrows(): + flag = " <<<" if "BASELINE" in str(name) else "" + c = row['cagr'] + if np.isnan(c): + continue + print(f" {str(name):<45} {c:>+6.1%} {row['sharpe']:>7.2f} {row['sortino']:>8.2f} " + f"{row['maxdd']:>+6.1%} {row['calmar']:>7.2f}{flag}") + return df + + +# ===================================================================== +# ROUND 1 — Academic & Practitioner Hypotheses +# ===================================================================== + +# --- Momentum family --- +def f_mom_12_1(p): return p.shift(21).pct_change(231) +def f_mom_6_1(p): return p.shift(21).pct_change(105) +def f_mom_3_1(p): return p.shift(21).pct_change(42) +def f_mom_1_0(p): return p.pct_change(21) # 1-month (reversal in US) + +# --- Recovery family --- +def f_rec_63(p): return p / p.rolling(63, min_periods=63).min() - 1 +def f_rec_126(p): return p / p.rolling(126, min_periods=126).min() - 1 +def f_rec_21(p): return p / p.rolling(21, min_periods=21).min() - 1 + +# Novy-Marx 2012: intermediate momentum (7-12 month) +def f_mom_intermediate(p): return p.shift(21).pct_change(147) # ~7 month + +# Asness et al: quality/profitability proxy via return consistency +def f_consistent_returns(p): + ret = p.pct_change() + return (ret > 0).astype(float).rolling(252, min_periods=126).mean() + +# Da, Liu, Schaumburg 2014: information discreteness +# Stocks with many small positive days > stocks with few large positive days +def f_info_discrete(p): + ret = p.pct_change() + n_pos = (ret > 0).astype(float).rolling(60, min_periods=40).sum() + sum_pos = ret.where(ret > 0, 0).rolling(60, min_periods=40).sum() + avg_pos = sum_pos / n_pos.replace(0, np.nan) + # High count of positive days + low average positive = smooth accumulation + return n_pos / avg_pos.replace(0, np.nan) + +# Accumulation proxy (worked in Round 1) +def f_up_volume_proxy(p): + ret = p.pct_change() + return ret.where(ret > 0, 0).rolling(20, min_periods=15).sum() + +# George & Hwang 2004: 52-week high ratio +def f_52w_high(p): + return p / p.rolling(252, min_periods=126).max() + +# Frequency of large up-moves (worked in Round 1) +def f_gap_up_freq(p): + ret = p.pct_change() + return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean() + +# Bali, Cakici, Whitelaw 2011: MAX effect (lottery demand) +def f_anti_max(p): + ret = p.pct_change() + return -ret.rolling(20, min_periods=15).max() + +# Ang et al 2006: idiosyncratic volatility (negative) +def f_neg_ivol(p): + ret = p.pct_change() + return -ret.rolling(20, min_periods=15).std() + +# Blitz & van Vliet 2007: low volatility anomaly +def f_low_vol_60(p): + ret = p.pct_change() + return -ret.rolling(60, min_periods=40).std() + +# Hurst exponent proxy — autocorrelation of returns +# Stocks with positive autocorrelation = trending +def f_autocorrelation(p): + ret = p.pct_change() + def _ac(x): + x = x.dropna() + if len(x) < 20: + return np.nan + return np.corrcoef(x[:-1], x[1:])[0, 1] + return ret.rolling(60, min_periods=40).apply(_ac, raw=False) + +# Short-term reversal (Jegadeesh 1990) +def f_str_5d(p): return -p.pct_change(5) +def f_str_10d(p): return -p.pct_change(10) + +# Earnings drift proxy (worked in Round 1) +def f_earnings_drift(p): + ret_5d = p.pct_change(5) + vol = p.pct_change().rolling(60, min_periods=40).std() * np.sqrt(5) + z = ret_5d / vol.replace(0, np.nan) + return z.rolling(60, min_periods=20).mean() + +# Risk-adjusted momentum (Sharpe-momentum) +def f_sharpe_mom(p): + ret = p.pct_change() + mu = ret.rolling(252, min_periods=126).mean() + sigma = ret.rolling(252, min_periods=126).std() + return mu / sigma.replace(0, np.nan) + +# Trend strength: slope of log-price regression +def f_trend_slope(p): + log_p = np.log(p.replace(0, np.nan)) + def _slope(x): + x = x.dropna().values + if len(x) < 30: + return np.nan + t = np.arange(len(x), dtype=float) + t -= t.mean() + return (t * (x - x.mean())).sum() / (t * t).sum() + return log_p.rolling(60, min_periods=30).apply(_slope, raw=False) + +# Acceleration: recent momentum vs. longer-term momentum +def f_mom_accel(p): + m3 = p.shift(5).pct_change(58) # ~3mo + m12 = p.shift(21).pct_change(231) # ~12mo + return m3 - m12 + +# Mean reversion z-score +def f_mean_rev_z(p): + ma20 = p.rolling(20, min_periods=20).mean() + vol = p.pct_change().rolling(60, min_periods=40).std() * p + return -(p - ma20) / vol.replace(0, np.nan) + +# Price relative to moving averages +def f_above_ma200(p): + return p / p.rolling(200, min_periods=200).mean() - 1 + +def f_above_ma50(p): + return p / p.rolling(50, min_periods=50).mean() - 1 + +# Dual MA signal: 50-day MA / 200-day MA +def f_golden_cross(p): + ma50 = p.rolling(50, min_periods=50).mean() + ma200 = p.rolling(200, min_periods=200).mean() + return ma50 / ma200 - 1 + +# Drawdown recovery rate +def f_dd_recovery_rate(p): + rm = p.rolling(252, min_periods=126).max() + dd = p / rm - 1 # negative when in drawdown + return dd - dd.shift(20) # positive = recovering from drawdown + +# A-share specific: short-term reversal x volatility +def f_reversal_vol(p): + return -p.pct_change(5) * p.pct_change().rolling(20, min_periods=15).std() + +# Recovery + momentum (baseline) +def f_rec_mom(p): + r1 = f_rec_63(p).rank(axis=1, pct=True, na_option="keep") + r2 = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + return 0.5 * r1 + 0.5 * r2 + + +# ===================================================================== +# ROUND 2 — Second-order ideas from Round 1 analysis +# ===================================================================== + +# The key insight: "quality of returns" matters more than "magnitude of returns" +# Factors that measure HOW a stock goes up, not just that it went up. + +# Smoothness-weighted momentum +def f_smooth_momentum(p): + """Momentum penalized by path volatility. Stocks that go up smoothly.""" + mom = p.shift(21).pct_change(231) + ret = p.pct_change() + vol = ret.rolling(252, min_periods=126).std() + return mom / (vol.replace(0, np.nan) ** 0.5) # sqrt to dampen + +# Positive return ratio (like Sharpe numerator) +def f_pos_ratio_60(p): + """Fraction of positive return days in 60 days. Quality signal.""" + ret = p.pct_change() + return (ret > 0).astype(float).rolling(60, min_periods=40).mean() + +# Cumulative positive returns vs cumulative negative returns +def f_up_down_asymmetry(p): + """Ratio of cumulative up-move to cumulative down-move.""" + ret = p.pct_change() + up = ret.where(ret > 0, 0).rolling(60, min_periods=40).sum() + down = (-ret.where(ret < 0, 0)).rolling(60, min_periods=40).sum() + return up / down.replace(0, np.nan) + +# Streak momentum: max consecutive up days in last 40 days +def f_max_streak(p): + ret = p.pct_change() + pos = (ret > 0).astype(float) + def _max_streak(x): + x = x.dropna().values + if len(x) == 0: + return 0 + best = cur = 0 + for v in x: + cur = cur + 1 if v > 0.5 else 0 + best = max(best, cur) + return best + return pos.rolling(40, min_periods=20).apply(_max_streak, raw=False) + +# Overnight proxy: gap between yesterday's close and today's pattern +# Since we only have close prices, use close-to-close 1d return decomposition +def f_up_capture(p): + """Up-market capture ratio over 60 days.""" + ret = p.pct_change() + mkt = ret.mean(axis=1) + up_mkt = mkt > 0 + arr = ret.values.copy() + arr[~up_mkt.values, :] = np.nan + stock_up = pd.DataFrame(arr, index=ret.index, columns=ret.columns) + mkt_up_vals = mkt.where(up_mkt, np.nan) + stock_avg = stock_up.rolling(60, min_periods=20).mean() + mkt_avg = mkt_up_vals.rolling(60, min_periods=20).mean() + return stock_avg.div(mkt_avg, axis=0) + +# Down-market resilience +def f_down_resilience(p): + """How much LESS a stock falls on down-market days.""" + ret = p.pct_change() + mkt = ret.mean(axis=1) + down_mkt = mkt < 0 + arr = ret.values.copy() + arr[~down_mkt.values, :] = np.nan + down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns) + return -down_ret.rolling(120, min_periods=30).mean() + +# Recovery from rolling max with momentum filter +def f_rec_mom_filtered(p): + """Recovery factor only for stocks with positive 6-month momentum. + Filters out dead-cat bounces.""" + rec = p / p.rolling(126, min_periods=126).min() - 1 + mom = p.shift(21).pct_change(105) + return rec.where(mom > 0, np.nan) + +# Information discreteness v2: using the sign ratio +def f_sign_ratio(p): + """Ratio of (count of positive days)^2 * avg_size to total return. + High ratio = many small ups = institutional flow.""" + ret = p.pct_change() + n_total = 60 + n_pos = (ret > 0).astype(float).rolling(n_total, min_periods=40).sum() + total_ret = ret.rolling(n_total, min_periods=40).sum() + sign_vol = n_pos / n_total + # Stocks where most of the return comes from many small positive days + return sign_vol * total_ret.clip(lower=0) + + +# ===================================================================== +# ROUND 3 — Interaction & conditional factors +# ===================================================================== + +def f_mom_x_recovery(p): + """Momentum × Recovery interaction. The product, not the sum.""" + mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + rec_r = f_rec_63(p).rank(axis=1, pct=True, na_option="keep") + return mom_r * rec_r + +def f_mom_x_upvol(p): + """Momentum × Up-volume-proxy interaction.""" + mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + return mom_r * up_r + +def f_rec_deep_x_upvol(p): + """Deep recovery × Up-volume interaction.""" + rec_r = f_rec_126(p).rank(axis=1, pct=True, na_option="keep") + up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + return rec_r * up_r + +def f_trend_x_mom(p): + """Trend strength × Momentum. Trending + momentum = double signal.""" + tr_r = f_trend_slope(p).rank(axis=1, pct=True, na_option="keep") + mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + return tr_r * mom_r + +def f_quality_mom(p): + """Momentum filtered by consistency. Only persistent winners.""" + mom = f_mom_12_1(p) + consist = f_consistent_returns(p) + mom_r = mom.rank(axis=1, pct=True, na_option="keep") + con_r = consist.rank(axis=1, pct=True, na_option="keep") + return 0.4 * mom_r + 0.3 * con_r + 0.3 * f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + +def f_rec_deep_x_gap(p): + """Deep recovery × gap-up frequency.""" + rec_r = f_rec_126(p).rank(axis=1, pct=True, na_option="keep") + gap_r = f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep") + return rec_r * gap_r + +def f_mom_x_gap(p): + """Momentum × gap-up frequency.""" + mom_r = f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + gap_r = f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep") + return mom_r * gap_r + +# Regime-conditional: momentum with volatility filter +def f_mom_low_vol_regime(p): + """Momentum only when market vol is below median. + Momentum crashes in high-vol regimes.""" + mom = f_mom_12_1(p) + mkt_vol = p.pct_change().mean(axis=1).rolling(60).std() + vol_median = mkt_vol.rolling(252, min_periods=126).median() + low_vol = mkt_vol <= vol_median + mask = pd.DataFrame( + np.tile(low_vol.values[:, None], (1, mom.shape[1])), + index=mom.index, columns=mom.columns, + ) + return mom.where(mask, 0) + + +# ===================================================================== +# Main loop +# ===================================================================== + +def run_round( + name: str, + factors: list[tuple[str, FactorFunc]], + prices: pd.DataFrame, + top_n: int = 10, +) -> list[dict]: + results = [] + for fname, func in factors: + r = test_factor(fname, func, prices, top_n=top_n) + results.append(r) + print_results(results, name) + return results + + +def run_market(market: str): + config = UNIVERSES[market] + benchmark = config["benchmark"] + prices = data_manager.load(market) + bench = prices[benchmark].dropna() if benchmark in prices.columns else None + stocks = prices.drop(columns=[benchmark], errors="ignore") + print(f"\n{'#'*95}") + print(f" FACTOR DISCOVERY LOOP — {market.upper()} MARKET") + print(f" {stocks.shape[1]} stocks, {stocks.shape[0]} days, " + f"{stocks.index[0].date()} → {stocks.index[-1].date()}") + print(f"{'#'*95}") + + # Benchmark + if bench is not None: + eq_bench = bench / bench.iloc[0] * 100000 + bs = stats(eq_bench) + print(f"\n Benchmark: CAGR {bs['cagr']:+.1%}, Sharpe {bs['sharpe']:.2f}") + + # ================================================================ + # ROUND 1: Academic & practitioner factors + # ================================================================ + r1_factors = [ + ("BASELINE:rec+mom", f_rec_mom), + # Momentum family + ("mom_12_1", f_mom_12_1), + ("mom_6_1", f_mom_6_1), + ("mom_3_1", f_mom_3_1), + ("mom_1_0", f_mom_1_0), + ("mom_intermediate_7m", f_mom_intermediate), + ("sharpe_momentum", f_sharpe_mom), + # Recovery family + ("recovery_63d", f_rec_63), + ("recovery_126d", f_rec_126), + ("recovery_21d", f_rec_21), + # Trend + ("trend_slope_60d", f_trend_slope), + ("golden_cross", f_golden_cross), + ("above_ma200", f_above_ma200), + # Volatility + ("low_vol_60d", f_low_vol_60), + ("neg_ivol_20d", f_neg_ivol), + # Reversal + ("STR_5d", f_str_5d), + ("STR_10d", f_str_10d), + # Quality / accumulation + ("consistent_returns", f_consistent_returns), + ("up_volume_proxy", f_up_volume_proxy), + ("gap_up_freq", f_gap_up_freq), + ("info_discrete", f_info_discrete), + ("earnings_drift", f_earnings_drift), + # Other academic + ("52w_high", f_52w_high), + ("anti_max_20d", f_anti_max), + ("dd_recovery_rate", f_dd_recovery_rate), + ("mom_acceleration", f_mom_accel), + ] + if market == "cn": + r1_factors.append(("reversal_vol_cn", f_reversal_vol)) + r1 = run_round("ROUND 1 — Academic & Practitioner Hypotheses", r1_factors, stocks) + + # Identify top-10 from round 1 + r1_sorted = sorted(r1, key=lambda x: x.get("cagr", 0) or 0, reverse=True) + r1_top_names = [r["name"] for r in r1_sorted[:10] if r.get("cagr") and r["cagr"] > 0] + baseline_cagr = next((r["cagr"] for r in r1 if "BASELINE" in r["name"]), 0) + print(f"\n Baseline CAGR: {baseline_cagr:+.1%}") + print(f" Top 10: {r1_top_names}") + + # ================================================================ + # ROUND 2: Second-order ideas based on what worked + # ================================================================ + r2_factors = [ + ("BASELINE:rec+mom", f_rec_mom), + ("smooth_momentum", f_smooth_momentum), + ("pos_ratio_60d", f_pos_ratio_60), + ("up_down_asymmetry", f_up_down_asymmetry), + ("max_streak_40d", f_max_streak), + ("up_capture_60d", f_up_capture), + ("down_resilience_120d", f_down_resilience), + ("rec_mom_filtered", f_rec_mom_filtered), + ("sign_ratio", f_sign_ratio), + ("autocorrelation_60d", f_autocorrelation), + ("mean_rev_z", f_mean_rev_z), + ] + r2 = run_round("ROUND 2 — Return Quality & Behavioral Factors", r2_factors, stocks) + + # ================================================================ + # ROUND 3: Interaction & conditional factors + # ================================================================ + r3_factors = [ + ("BASELINE:rec+mom", f_rec_mom), + ("mom×recovery", f_mom_x_recovery), + ("mom×upvol", f_mom_x_upvol), + ("rec_deep×upvol", f_rec_deep_x_upvol), + ("trend×mom", f_trend_x_mom), + ("quality_mom", f_quality_mom), + ("rec_deep×gap", f_rec_deep_x_gap), + ("mom×gap", f_mom_x_gap), + ("mom_low_vol_regime", f_mom_low_vol_regime), + ] + r3 = run_round("ROUND 3 — Interaction & Conditional Factors", r3_factors, stocks) + + # ================================================================ + # Collect ALL results from all rounds + # ================================================================ + all_results = r1 + r2 + r3 + # Deduplicate baseline + seen = set() + unique = [] + for r in all_results: + if r["name"] not in seen: + seen.add(r["name"]) + unique.append(r) + unique_sorted = sorted(unique, key=lambda x: x.get("cagr", 0) or 0, reverse=True) + + print(f"\n{'='*95}") + print(f" ALL ROUNDS COMBINED — TOP 15 FACTORS — {market.upper()}") + print(f"{'='*95}") + print(f" {'Factor':<45} {'CAGR':>7} {'Sharpe':>7} {'Sortino':>8} {'MaxDD':>7} {'Calmar':>7}") + print(f" {'-'*85}") + for r in unique_sorted[:15]: + flag = " <<<" if "BASELINE" in r["name"] else "" + print(f" {r['name']:<45} {r['cagr']:>+6.1%} {r['sharpe']:>7.2f} {r['sortino']:>8.2f} " + f"{r['maxdd']:>+6.1%} {r['calmar']:>7.2f}{flag}") + + # ================================================================ + # ROUND 4: Combine top non-baseline factors + # ================================================================ + top_funcs = {} + func_map_all = dict(r1_factors + r2_factors + r3_factors) + non_baseline = [r for r in unique_sorted if "BASELINE" not in r["name"] and r.get("cagr", 0)] + for r in non_baseline[:12]: + if r["name"] in func_map_all: + top_funcs[r["name"]] = func_map_all[r["name"]] + + top_names = list(top_funcs.keys()) + print(f"\n Building combinations from top {len(top_names)} factors: {top_names}") + + combo_factors = [("BASELINE:rec+mom", f_rec_mom)] + + # All pairs from top-8 + for i in range(min(8, len(top_names))): + for j in range(i + 1, min(8, len(top_names))): + n1, n2 = top_names[i], top_names[j] + combo_factors.append(( + f"{n1[:20]}+{n2[:20]}", + combo([(top_funcs[n1], 0.5), (top_funcs[n2], 0.5)]) + )) + + # Triple combos from top-5 + for i in range(min(5, len(top_names))): + for j in range(i + 1, min(5, len(top_names))): + for k in range(j + 1, min(5, len(top_names))): + n1, n2, n3 = top_names[i], top_names[j], top_names[k] + combo_factors.append(( + f"{n1[:15]}+{n2[:15]}+{n3[:15]}", + combo([(top_funcs[n1], 0.33), (top_funcs[n2], 0.33), (top_funcs[n3], 0.34)]) + )) + + r4 = run_round("ROUND 4 — Factor Combinations", combo_factors, stocks) + + # ================================================================ + # ROUND 5: Yearly breakdown of top 5 combos + # ================================================================ + r4_sorted = sorted(r4, key=lambda x: x.get("cagr", 0) or 0, reverse=True) + top5 = r4_sorted[:5] + # Make sure baseline is included + base = next((r for r in r4 if "BASELINE" in r["name"]), None) + if base and base not in top5: + top5.append(base) + + print(f"\n{'='*95}") + print(f" ROUND 5 — YEARLY RETURNS OF BEST STRATEGIES — {market.upper()}") + print(f"{'='*95}") + + cols = [(r["name"], r["equity"]) for r in top5] + if bench is not None: + eq_bench = bench / bench.iloc[0] * 100000 + cols.append(("BENCHMARK", eq_bench)) + + # Header + header = f" {'Year':<6}" + for name, _ in cols: + header += f" | {name[:22]:>22}" + print(header) + print(" " + "-" * (6 + 25 * len(cols))) + + all_years = sorted(set(y for _, eq in cols for y in eq.index.year.unique())) + for year in all_years: + line = f" {year:<6}" + for _, eq in cols: + dr = eq.pct_change().fillna(0) + yr = dr[dr.index.year == year] + r = float((1 + yr).prod() - 1) if len(yr) > 0 else 0 + line += f" | {r:>+21.1%}" + print(line) + + # Period CAGRs + for ny in [3, 5, 10]: + cutoff = stocks.index[-1] - pd.DateOffset(years=ny) + print(f"\n --- {ny}-year CAGR ---") + for name, eq in cols: + sl = eq[eq.index >= cutoff] + if len(sl) < 50: + continue + tot = sl.iloc[-1] / sl.iloc[0] - 1 + cagr = (1 + tot) ** (1 / ny) - 1 + print(f" {name[:50]:<50} {cagr:>+8.1%}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--market", default="us", choices=["us", "cn"]) + args = parser.parse_args() + run_market(args.market) + + +if __name__ == "__main__": + main() diff --git a/factor_real_backtest.py b/factor_real_backtest.py new file mode 100644 index 0000000..810e22b --- /dev/null +++ b/factor_real_backtest.py @@ -0,0 +1,449 @@ +""" +Factor research v2 — Portfolio-first approach. + +Instead of IC → portfolio, we go directly to: + 1. Build factor signal + 2. Select top-N stocks + 3. Run real backtest with transaction costs + 4. Measure CAGR, Sharpe, MaxDD, yearly returns + +Tests single factors AND combinations. Compares everything against +the baseline recovery+momentum strategy. +""" + +from __future__ import annotations + +import argparse +import warnings + +import numpy as np +import pandas as pd + +import data_manager +import metrics +from universe import UNIVERSES + +warnings.filterwarnings("ignore") + +# --------------------------------------------------------------------------- +# Factor signals — each returns DataFrame (dates x stocks), higher = better +# --------------------------------------------------------------------------- + +def f_momentum_12_1(p: pd.DataFrame) -> pd.DataFrame: + return p.shift(21).pct_change(231) + +def f_recovery(p: pd.DataFrame) -> pd.DataFrame: + return p / p.rolling(63, min_periods=63).min() - 1 + +def f_recovery_mom(p: pd.DataFrame) -> pd.DataFrame: + """The baseline composite: 50/50 recovery + momentum ranks.""" + r1 = f_recovery(p).rank(axis=1, pct=True, na_option="keep") + r2 = f_momentum_12_1(p).rank(axis=1, pct=True, na_option="keep") + return 0.5 * r1 + 0.5 * r2 + +# --- New single factors --- + +def f_short_term_reversal(p: pd.DataFrame) -> pd.DataFrame: + """5-day return reversal.""" + return -p.pct_change(5) + +def f_vol_adjusted_mom(p: pd.DataFrame) -> pd.DataFrame: + """Momentum divided by recent volatility. Sharpe-like signal. + Hypothesis: risk-adjusted momentum is more persistent.""" + mom = p.shift(21).pct_change(231) + vol = p.pct_change().rolling(60, min_periods=40).std() + return mom / vol.replace(0, np.nan) + +def f_acceleration(p: pd.DataFrame) -> pd.DataFrame: + """3-month momentum minus 12-month momentum. + Hypothesis: accelerating stocks continue accelerating.""" + mom_3m = p.shift(5).pct_change(63 - 5) + mom_12m = p.shift(21).pct_change(231) + return mom_3m - mom_12m + +def f_breakout(p: pd.DataFrame) -> pd.DataFrame: + """Price relative to 20-day high. Close to 1 = breaking out. + Hypothesis: breakouts from consolidation continue.""" + return p / p.rolling(20, min_periods=20).max() + +def f_recovery_deep(p: pd.DataFrame) -> pd.DataFrame: + """Recovery from 126-day (6 month) low instead of 63-day. + Hypothesis: deeper recovery = stronger signal.""" + return p / p.rolling(126, min_periods=126).min() - 1 + +def f_recovery_rate(p: pd.DataFrame) -> pd.DataFrame: + """Speed of recovery: 20-day change in recovery factor. + Hypothesis: accelerating recovery predicts continuation.""" + recovery = p / p.rolling(63, min_periods=63).min() - 1 + return recovery - recovery.shift(20) + +def f_drawdown_bounce(p: pd.DataFrame) -> pd.DataFrame: + """20-day return from drawdown trough, only for stocks in drawdown. + Hypothesis: strong bounces from drawdowns persist.""" + rolling_max = p.rolling(252, min_periods=126).max() + in_drawdown = p < rolling_max * 0.9 # at least 10% below peak + bounce_20d = p.pct_change(20) + # Only score stocks that were recently in drawdown + was_in_drawdown = in_drawdown.rolling(20, min_periods=1).max().astype(bool) + return bounce_20d.where(was_in_drawdown, np.nan) + +def f_consistent_winner(p: pd.DataFrame) -> pd.DataFrame: + """Fraction of months with positive returns over past 12 months. + Hypothesis: stocks that win consistently are higher quality momentum.""" + monthly_ret = p.pct_change(21) + return (monthly_ret > 0).astype(float).rolling(252, min_periods=126).mean() + +def f_gap_up_freq(p: pd.DataFrame) -> pd.DataFrame: + """Fraction of days with >1% gain in past 60 days. + Hypothesis: frequent large gains = institutional buying.""" + ret = p.pct_change() + return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean() + +def f_low_vol_mom(p: pd.DataFrame) -> pd.DataFrame: + """Momentum only among low-volatility stocks. Combined rank. + Hypothesis: low-vol momentum is more persistent.""" + mom = f_momentum_12_1(p).rank(axis=1, pct=True, na_option="keep") + vol = (-p.pct_change().rolling(60, min_periods=40).std()).rank(axis=1, pct=True, na_option="keep") + return 0.5 * mom + 0.5 * vol + +def f_52w_channel_position(p: pd.DataFrame) -> pd.DataFrame: + """Position within 252-day high-low channel. 1 = at high, 0 = at low. + Hypothesis: stocks near highs continue (anchoring + trend).""" + h = p.rolling(252, min_periods=126).max() + l = p.rolling(252, min_periods=126).min() + return (p - l) / (h - l).replace(0, np.nan) + +def f_up_volume_proxy(p: pd.DataFrame) -> pd.DataFrame: + """Proxy for accumulation: sum of returns on up days over 20 days. + Without volume data, use magnitude of positive returns as proxy.""" + ret = p.pct_change() + up_ret = ret.where(ret > 0, 0) + return up_ret.rolling(20, min_periods=15).sum() + +def f_relative_strength_ma(p: pd.DataFrame) -> pd.DataFrame: + """Price above 50-day MA relative to 200-day MA position. + Dual MA trend strength.""" + ma50 = p.rolling(50, min_periods=50).mean() + ma200 = p.rolling(200, min_periods=200).mean() + above_50 = (p / ma50 - 1) + above_200 = (p / ma200 - 1) + return 0.5 * above_50 + 0.5 * above_200 + +def f_earnings_drift_proxy(p: pd.DataFrame) -> pd.DataFrame: + """Proxy for post-earnings drift using 5-day return spikes. + Identify large 5-day moves and bet on continuation. + Hypothesis: large moves driven by information continue.""" + ret_5d = p.pct_change(5) + vol = p.pct_change().rolling(60, min_periods=40).std() * np.sqrt(5) + z_score = ret_5d / vol.replace(0, np.nan) + # Smooth: average z-score over past 60 days to capture multiple events + return z_score.rolling(60, min_periods=20).mean() + +# --- A-share specific --- + +def f_reversal_vol_cn(p: pd.DataFrame) -> pd.DataFrame: + """Short-term reversal amplified by volatility. + High-vol oversold stocks bounce harder in A-shares.""" + ret_5d = p.pct_change(5) + vol = p.pct_change().rolling(20, min_periods=15).std() + # Oversold (negative return) + high vol = positive score + return -ret_5d * vol + +def f_momentum_6_1(p: pd.DataFrame) -> pd.DataFrame: + """6-1 month momentum. Shorter lookback may work better in A-shares.""" + return p.shift(21).pct_change(105) + +def f_recovery_narrow(p: pd.DataFrame) -> pd.DataFrame: + """Recovery from 21-day low. Faster recovery signal for A-shares.""" + return p / p.rolling(21, min_periods=21).min() - 1 + +def f_range_breakout_cn(p: pd.DataFrame) -> pd.DataFrame: + """Breakout from 60-day range. Tuned for A-share volatility.""" + h60 = p.rolling(60, min_periods=40).max() + l60 = p.rolling(60, min_periods=40).min() + mid = (h60 + l60) / 2 + rng = (h60 - l60) / mid.replace(0, np.nan) + position = (p - l60) / (h60 - l60).replace(0, np.nan) + # Reward stocks breaking out of narrow ranges + return position / rng.replace(0, np.nan) + + +# --------------------------------------------------------------------------- +# Strategy builder and backtester +# --------------------------------------------------------------------------- + +def make_strategy( + prices: pd.DataFrame, + signal_func, + top_n: int = 10, + rebal_freq: int = 21, + warmup: int = 252, +) -> pd.DataFrame: + """Turn a factor signal into a rebalanced top-N equal-weight strategy.""" + signal = signal_func(prices) + + rank = signal.rank(axis=1, ascending=False, na_option="bottom") + n_valid = signal.notna().sum(axis=1) + enough = n_valid >= top_n + top_mask = (rank <= top_n) & enough.values.reshape(-1, 1) + + raw = top_mask.astype(float) + row_sums = raw.sum(axis=1).replace(0, np.nan) + weights = raw.div(row_sums, axis=0).fillna(0.0) + + # Monthly rebalance + rebal_mask = pd.Series(False, index=prices.index) + rebal_indices = list(range(warmup, len(prices), rebal_freq)) + rebal_mask.iloc[rebal_indices] = True + weights[~rebal_mask] = np.nan + weights = weights.ffill().fillna(0.0) + weights.iloc[:warmup] = 0.0 + + return weights.shift(1).fillna(0.0) + + +def combo_signal(funcs_and_weights: list[tuple]) -> callable: + """Create a combined signal function from [(func, weight), ...].""" + def _combo(p: pd.DataFrame) -> pd.DataFrame: + ranked = [] + for func, w in funcs_and_weights: + sig = func(p) + ranked.append(w * sig.rank(axis=1, pct=True, na_option="keep")) + return sum(ranked) + return _combo + + +def run_backtest( + weights: pd.DataFrame, + prices: pd.DataFrame, + cost: float = 0.001, +) -> pd.Series: + """Vectorized backtest returning equity curve.""" + returns = prices.pct_change().fillna(0.0) + port_ret = (weights * returns).sum(axis=1) + turnover = weights.diff().abs().sum(axis=1) + port_ret -= turnover * cost + return (1 + port_ret).cumprod() * 100000 + + +def compute_stats(equity: pd.Series, label: str) -> dict: + """Compute strategy statistics.""" + daily_ret = equity.pct_change().dropna() + if len(daily_ret) < 100 or daily_ret.std() == 0: + return {"name": label, "cagr": np.nan, "sharpe": np.nan, "maxdd": np.nan, + "total": np.nan, "win_rate": np.nan} + + n_years = len(daily_ret) / 252 + total_ret = equity.iloc[-1] / equity.iloc[0] - 1 + cagr = (1 + total_ret) ** (1 / n_years) - 1 + sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) + sortino_denom = daily_ret[daily_ret < 0].std() + sortino = daily_ret.mean() / sortino_denom * np.sqrt(252) if sortino_denom > 0 else 0 + running_max = equity.cummax() + maxdd = ((equity - running_max) / running_max).min() + calmar = cagr / abs(maxdd) if maxdd != 0 else 0 + win_rate = (daily_ret > 0).mean() + + return { + "name": label, "cagr": cagr, "sharpe": sharpe, "sortino": sortino, + "maxdd": maxdd, "calmar": calmar, "total": total_ret, "win_rate": win_rate, + } + + +def yearly_returns(equity: pd.Series) -> dict[int, float]: + daily_ret = equity.pct_change().fillna(0) + years = daily_ret.index.year + result = {} + for year in sorted(years.unique()): + mask = years == year + result[year] = float((1 + daily_ret[mask]).prod() - 1) + return result + + +def run(market: str): + config = UNIVERSES[market] + benchmark = config["benchmark"] + + print(f"Loading {market.upper()} price data...") + prices = data_manager.load(market) + bench = prices[benchmark].dropna() if benchmark in prices.columns else None + stocks = prices.drop(columns=[benchmark], errors="ignore") + print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} days") + print(f"Period: {stocks.index[0].date()} to {stocks.index[-1].date()}\n") + + # --- Define all strategies to test --- + strategies: list[tuple[str, callable]] = [] + + # Baseline + strategies.append(("BASELINE: recovery+mom", f_recovery_mom)) + + # Single factors + strategies.append(("momentum_12_1", f_momentum_12_1)) + strategies.append(("recovery", f_recovery)) + strategies.append(("vol_adj_momentum", f_vol_adjusted_mom)) + strategies.append(("acceleration", f_acceleration)) + strategies.append(("breakout_20d", f_breakout)) + strategies.append(("recovery_deep_126d", f_recovery_deep)) + strategies.append(("recovery_rate", f_recovery_rate)) + strategies.append(("drawdown_bounce", f_drawdown_bounce)) + strategies.append(("consistent_winner", f_consistent_winner)) + strategies.append(("gap_up_freq", f_gap_up_freq)) + strategies.append(("low_vol_momentum", f_low_vol_mom)) + strategies.append(("52w_channel_position", f_52w_channel_position)) + strategies.append(("up_volume_proxy", f_up_volume_proxy)) + strategies.append(("relative_strength_ma", f_relative_strength_ma)) + strategies.append(("earnings_drift_proxy", f_earnings_drift_proxy)) + + if market == "cn": + strategies.append(("reversal_vol_cn", f_reversal_vol_cn)) + strategies.append(("momentum_6_1", f_momentum_6_1)) + strategies.append(("recovery_narrow_21d", f_recovery_narrow)) + strategies.append(("range_breakout_cn", f_range_breakout_cn)) + + # Run all single-factor backtests + print("=" * 110) + print(f" SINGLE FACTOR BACKTESTS — {market.upper()} (Top 10, monthly rebal, 10bps cost)") + print("=" * 110) + + results = [] + equities = {} + for name, func in strategies: + print(f" Running: {name}...") + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + equities[name] = eq + results.append(compute_stats(eq, name)) + + # Benchmark + if bench is not None: + eq_bench = bench / bench.iloc[0] * 100000 + equities["BENCHMARK"] = eq_bench + results.append(compute_stats(eq_bench, "BENCHMARK")) + + # Print results table + df = pd.DataFrame(results).set_index("name") + df = df.sort_values("cagr", ascending=False) + print(f"\n{'Strategy':<30} {'CAGR':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD':>8} {'Calmar':>8} {'Total':>10}") + print("-" * 90) + for name, row in df.iterrows(): + flag = " ***" if name == "BASELINE: recovery+mom" else "" + print(f"{name:<30} {row['cagr']:>+7.1%} {row['sharpe']:>8.2f} {row['sortino']:>8.2f} " + f"{row['maxdd']:>+7.1%} {row['calmar']:>8.2f} {row['total']:>+9.0%}{flag}") + + # --- Identify factors that beat or match baseline --- + baseline_cagr = df.loc["BASELINE: recovery+mom", "cagr"] + winners = df[df["cagr"] >= baseline_cagr * 0.8].index.tolist() + winners = [w for w in winners if w not in ("BASELINE: recovery+mom", "BENCHMARK")] + print(f"\nFactors within 80% of baseline CAGR ({baseline_cagr:.1%}): {winners}") + + # --- Test combinations of top performers --- + print(f"\n{'='*110}") + print(f" FACTOR COMBINATIONS — {market.upper()}") + print(f"{'='*110}") + + # Get top single factors + single_only = df.drop(["BASELINE: recovery+mom", "BENCHMARK"], errors="ignore") + top_singles = single_only.nlargest(8, "cagr").index.tolist() + print(f" Top 8 singles: {top_singles}\n") + + # Map names back to functions + func_map = dict(strategies) + + combos: list[tuple[str, callable]] = [] + # Baseline is always included + combos.append(("BASELINE: recovery+mom", f_recovery_mom)) + + # Top2 combinations + for i in range(min(6, len(top_singles))): + for j in range(i + 1, min(6, len(top_singles))): + n1, n2 = top_singles[i], top_singles[j] + label = f"{n1} + {n2}" + func = combo_signal([(func_map[n1], 0.5), (func_map[n2], 0.5)]) + combos.append((label, func)) + + # Recovery+mom + each top single (3-factor) + for name in top_singles[:6]: + if name in ("momentum_12_1", "recovery"): + continue + label = f"rec+mom + {name}" + func = combo_signal([ + (f_recovery, 0.33), (f_momentum_12_1, 0.33), (func_map[name], 0.34) + ]) + combos.append((label, func)) + + # Run combo backtests + combo_results = [] + for name, func in combos: + print(f" Running: {name}...") + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + equities[name] = eq + combo_results.append(compute_stats(eq, name)) + + combo_df = pd.DataFrame(combo_results).set_index("name") + combo_df = combo_df.sort_values("cagr", ascending=False) + + print(f"\n{'Combo':<55} {'CAGR':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD':>8} {'Calmar':>8}") + print("-" * 105) + for name, row in combo_df.iterrows(): + flag = " ***" if name == "BASELINE: recovery+mom" else "" + print(f"{name:<55} {row['cagr']:>+7.1%} {row['sharpe']:>8.2f} {row['sortino']:>8.2f} " + f"{row['maxdd']:>+7.1%} {row['calmar']:>8.2f}{flag}") + + # --- Yearly breakdown for top 3 combos --- + top3 = combo_df.nlargest(3, "cagr").index.tolist() + if "BASELINE: recovery+mom" not in top3: + top3.append("BASELINE: recovery+mom") + + print(f"\n{'='*110}") + print(f" YEARLY RETURNS — TOP STRATEGIES vs BASELINE — {market.upper()}") + print(f"{'='*110}") + + yr_data = {} + for name in top3: + yr_data[name] = yearly_returns(equities[name]) + if bench is not None: + yr_data["BENCHMARK"] = yearly_returns(equities["BENCHMARK"]) + + all_years = sorted(set(y for yd in yr_data.values() for y in yd.keys())) + + # Print header + col_names = top3 + (["BENCHMARK"] if bench is not None else []) + header = f" {'Year':<6}" + for c in col_names: + header += f" | {c[:25]:>25}" + print(header) + print(" " + "-" * (6 + 28 * len(col_names))) + + for year in all_years: + line = f" {year:<6}" + for c in col_names: + r = yr_data.get(c, {}).get(year, 0) + line += f" | {r:>+24.1%}" + print(line) + + # Compute period summaries + for n_years in [3, 5, 10]: + cutoff = stocks.index[-1] - pd.DateOffset(years=n_years) + print(f"\n --- {n_years}-year CAGR ---") + for name in col_names: + eq = equities.get(name) + if eq is None: + continue + eq_slice = eq[eq.index >= cutoff] + if len(eq_slice) < 50: + continue + total = eq_slice.iloc[-1] / eq_slice.iloc[0] - 1 + cagr = (1 + total) ** (1 / n_years) - 1 + print(f" {name[:40]:<40} {cagr:>+8.1%}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--market", default="us", choices=["us", "cn"]) + args = parser.parse_args() + run(args.market) + + +if __name__ == "__main__": + main() diff --git a/factor_research.py b/factor_research.py new file mode 100644 index 0000000..ffb208b --- /dev/null +++ b/factor_research.py @@ -0,0 +1,547 @@ +""" +Factor Research Script — Professional QR-style factor mining. + +Tests candidate alpha factors using: + - Information Coefficient (IC): rank correlation of signal vs forward returns + - IC Information Ratio (ICIR): mean(IC) / std(IC), measures signal consistency + - Quintile long-short spread: monotonicity of returns across signal buckets + - Turnover: daily rank change, proxy for trading cost + - Decay profile: IC at 1d, 5d, 10d, 20d horizons + - Sub-period stability: IC consistency across rolling windows + - Factor correlation matrix: ensures new factors are orthogonal to known ones + +Usage: + uv run python factor_research.py --market us + uv run python factor_research.py --market cn +""" + +from __future__ import annotations + +import argparse +import warnings + +import numpy as np +import pandas as pd + +import data_manager +from universe import UNIVERSES + +warnings.filterwarnings("ignore", category=FutureWarning) + +HORIZONS = [1, 5, 10, 20] + + +# --------------------------------------------------------------------------- +# Factor definitions — each returns a DataFrame (dates x stocks) of scores +# --------------------------------------------------------------------------- + +def _safe_rank(df: pd.DataFrame) -> pd.DataFrame: + return df.rank(axis=1, pct=True, na_option="keep") + + +def _rolling_ret(prices: pd.DataFrame, window: int) -> pd.DataFrame: + return prices.pct_change(window) + + +# --- Known factors (baselines) --- + +def factor_momentum_12_1(prices: pd.DataFrame) -> pd.DataFrame: + """Classic 12-1 month momentum.""" + return prices.shift(21).pct_change(231) + + +def factor_recovery(prices: pd.DataFrame) -> pd.DataFrame: + """Price / 63-day low - 1.""" + return prices / prices.rolling(63, min_periods=63).min() - 1 + + +def factor_inverse_vol(prices: pd.DataFrame) -> pd.DataFrame: + """Negative 60-day realized volatility (low vol = high score).""" + return -prices.pct_change().rolling(60, min_periods=60).std() + + +# --- NEW candidate factors --- + +def factor_short_term_reversal(prices: pd.DataFrame) -> pd.DataFrame: + """5-day return reversal. Hypothesis: short-term mean reversion.""" + return -prices.pct_change(5) + + +def factor_idio_vol_change(prices: pd.DataFrame) -> pd.DataFrame: + """Change in idiosyncratic volatility (20d vs 60d). + Hypothesis: declining vol = stabilizing, predicts positive returns.""" + ret = prices.pct_change() + vol_20 = ret.rolling(20, min_periods=20).std() + vol_60 = ret.rolling(60, min_periods=60).std() + return -(vol_20 / vol_60.replace(0, np.nan) - 1) # negative = vol declining + + +def factor_volume_price_divergence(prices: pd.DataFrame, volume: pd.DataFrame | None = None) -> pd.DataFrame: + """Price up but momentum fading — proxy via acceleration. + Without volume data, use return acceleration as proxy.""" + ret_5 = prices.pct_change(5) + ret_20 = prices.pct_change(20) + return ret_5 - ret_20 / 4 # recent returns outpacing trend + + +def factor_max_drawdown_recovery(prices: pd.DataFrame) -> pd.DataFrame: + """How much of the 60-day max drawdown has been recovered. + Hypothesis: stocks that recover from drawdowns continue recovering.""" + rolling_max = prices.rolling(60, min_periods=60).max() + drawdown = prices / rolling_max - 1 # negative + rolling_min_dd = drawdown.rolling(60, min_periods=20).min() # worst drawdown + recovery_pct = drawdown / rolling_min_dd.replace(0, np.nan) + return recovery_pct # closer to 0 = more recovered + + +def factor_skewness(prices: pd.DataFrame) -> pd.DataFrame: + """Negative 20-day return skewness. + Hypothesis: negatively skewed stocks are overpriced (lottery preference).""" + ret = prices.pct_change() + return -ret.rolling(20, min_periods=20).skew() + + +def factor_high_low_range(prices: pd.DataFrame) -> pd.DataFrame: + """20-day high-low range relative to price. + Hypothesis: narrow range = consolidation, breakout ahead.""" + high_20 = prices.rolling(20, min_periods=20).max() + low_20 = prices.rolling(20, min_periods=20).min() + mid = (high_20 + low_20) / 2 + return -(high_20 - low_20) / mid.replace(0, np.nan) # negative range = narrow = high score + + +def factor_mean_reversion_residual(prices: pd.DataFrame) -> pd.DataFrame: + """Distance from 20-day MA as fraction of 60-day vol. + Hypothesis: stocks far below MA revert. Z-score style.""" + ma_20 = prices.rolling(20, min_periods=20).mean() + vol_60 = prices.pct_change().rolling(60, min_periods=60).std() * prices + return -(prices - ma_20) / vol_60.replace(0, np.nan) # below MA = high score + + +def factor_up_down_vol_ratio(prices: pd.DataFrame) -> pd.DataFrame: + """Ratio of upside to downside semi-deviation (20d). + Hypothesis: stocks with more upside vol have positive momentum.""" + ret = prices.pct_change() + up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).std() + down_vol = ret.where(ret < 0, 0).rolling(20, min_periods=15).std() + return up_vol / down_vol.replace(0, np.nan) + + +def factor_consecutive_up_days(prices: pd.DataFrame) -> pd.DataFrame: + """Fraction of positive return days in last 10 days. + Hypothesis: persistent winners keep winning (short-term).""" + ret = prices.pct_change() + return (ret > 0).astype(float).rolling(10, min_periods=10).mean() + + +def factor_gap_momentum(prices: pd.DataFrame) -> pd.DataFrame: + """Cumulative overnight-like gaps: close-to-close vs intraday proxy. + Using 1-day returns smoothed over 20 days minus 5-day return. + Hypothesis: smooth consistent returns beat volatile ones.""" + ret_1d = prices.pct_change() + smoothness = ret_1d.rolling(20, min_periods=20).mean() * 20 + raw_20d = prices.pct_change(20) + return smoothness - raw_20d # positive = smoother path + + +def factor_recovery_acceleration(prices: pd.DataFrame) -> pd.DataFrame: + """Rate of change of recovery factor. + Hypothesis: accelerating recovery is stronger signal than level.""" + recovery = prices / prices.rolling(63, min_periods=63).min() - 1 + return recovery.pct_change(5) + + +def factor_trend_strength(prices: pd.DataFrame) -> pd.DataFrame: + """R-squared of log-price vs time over 60 days. + Hypothesis: stocks trending linearly (high R2) continue.""" + log_p = np.log(prices.replace(0, np.nan)) + def _r2(series): + y = series.dropna().values + if len(y) < 30: + return np.nan + x = np.arange(len(y), dtype=float) + x -= x.mean() + y_dm = y - y.mean() + ss_xy = (x * y_dm).sum() + ss_xx = (x * x).sum() + ss_yy = (y_dm * y_dm).sum() + if ss_xx == 0 or ss_yy == 0: + return np.nan + r2 = (ss_xy ** 2) / (ss_xx * ss_yy) + slope = ss_xy / ss_xx + return r2 if slope > 0 else -r2 # sign by direction + return log_p.rolling(60, min_periods=30).apply(_r2, raw=False) + + +def factor_relative_volume_momentum(prices: pd.DataFrame) -> pd.DataFrame: + """Price momentum weighted by how 'cheap' a stock is relative to 52-week range. + Hypothesis: momentum in stocks near lows is more likely to persist.""" + mom_20 = prices.pct_change(20) + high_252 = prices.rolling(252, min_periods=126).max() + low_252 = prices.rolling(252, min_periods=126).min() + position_in_range = (prices - low_252) / (high_252 - low_252).replace(0, np.nan) + return mom_20 * (1 - position_in_range) # momentum * cheapness + + +def factor_52w_high_distance(prices: pd.DataFrame) -> pd.DataFrame: + """Distance from 52-week high. + Hypothesis: stocks near their highs continue (anchoring bias).""" + high_252 = prices.rolling(252, min_periods=126).max() + return prices / high_252 # closer to 1 = near high + + +def factor_downside_beta_proxy(prices: pd.DataFrame) -> pd.DataFrame: + """Proxy for downside beta using co-movement on market down days. + Hypothesis: low downside beta outperforms (asymmetric risk).""" + ret = prices.pct_change() + market_ret = ret.mean(axis=1) + down_days = market_ret < 0 + + # Mask non-down-day returns to NaN, then rolling mean + # Use numpy for correct broadcasting, wider window (120d) so ~54 down + # days are available, well above min_periods=20 + arr = ret.values.copy() + arr[~down_days.values, :] = np.nan + down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns) + avg_down = down_ret.rolling(120, min_periods=20).mean() + return -avg_down # negative = less downside = good + + +# --- A-share specific factors --- + +def factor_liquidity_premium(prices: pd.DataFrame) -> pd.DataFrame: + """Amihud illiquidity proxy (using returns only, no volume). + Hypothesis: illiquid stocks earn premium in A-shares (retail driven).""" + ret = prices.pct_change() + # Use absolute return as illiquidity proxy (higher abs ret = less liquid) + illiq = ret.abs().rolling(20, min_periods=15).mean() + return illiq + + +def factor_lottery_demand(prices: pd.DataFrame) -> pd.DataFrame: + """Max daily return in past 20 days (negative). + Hypothesis: lottery stocks (high max return) underperform. + Strong in A-shares due to retail speculation.""" + ret = prices.pct_change() + return -ret.rolling(20, min_periods=15).max() + + +def factor_turnover_reversal(prices: pd.DataFrame) -> pd.DataFrame: + """Interaction of short-term returns with volatility. + High vol + negative return = oversold bounce candidate. + Common A-share alpha source.""" + ret_5 = prices.pct_change(5) + vol_20 = prices.pct_change().rolling(20, min_periods=15).std() + return -ret_5 * vol_20 # oversold + high vol = positive + + +def factor_price_level(prices: pd.DataFrame) -> pd.DataFrame: + """Negative absolute price level. + Hypothesis: low-priced stocks attract retail in A-shares (penny stock effect).""" + return -prices + + +# --------------------------------------------------------------------------- +# IC and analytics engine +# --------------------------------------------------------------------------- + +def compute_ic( + signal: pd.DataFrame, + forward_ret: pd.DataFrame, +) -> pd.Series: + """Cross-sectional rank IC (Spearman) per day.""" + common_idx = signal.index.intersection(forward_ret.index) + common_cols = signal.columns.intersection(forward_ret.columns) + sig = signal.loc[common_idx, common_cols] + fwd = forward_ret.loc[common_idx, common_cols] + + ics = [] + for date in common_idx: + s = sig.loc[date].dropna() + f = fwd.loc[date].dropna() + common = s.index.intersection(f.index) + if len(common) < 30: + continue + ic = s[common].corr(f[common], method="spearman") + if np.isfinite(ic): + ics.append((date, ic)) + + if not ics: + return pd.Series(dtype=float) + return pd.Series(dict(ics)) + + +def compute_quintile_returns( + signal: pd.DataFrame, + forward_ret: pd.DataFrame, + n_quantiles: int = 5, +) -> pd.DataFrame: + """Average forward return by signal quintile, per day, then time-averaged.""" + common_idx = signal.index.intersection(forward_ret.index) + common_cols = signal.columns.intersection(forward_ret.columns) + sig = signal.loc[common_idx, common_cols] + fwd = forward_ret.loc[common_idx, common_cols] + + records = [] + for date in common_idx: + s = sig.loc[date].dropna() + f = fwd.loc[date].dropna() + common = s.index.intersection(f.index) + if len(common) < 50: + continue + scores = s[common] + rets = f[common] + try: + quintile = pd.qcut(scores, n_quantiles, labels=False, duplicates="drop") + except ValueError: + continue + for q in range(n_quantiles): + mask = quintile == q + if mask.sum() > 0: + records.append({"date": date, "quintile": q + 1, "return": rets[mask].mean()}) + + if not records: + return pd.DataFrame() + df = pd.DataFrame(records) + return df.groupby("quintile")["return"].mean() * 252 # annualize + + +def compute_turnover(signal: pd.DataFrame) -> float: + """Average daily rank change (turnover proxy).""" + ranked = signal.rank(axis=1, pct=True, na_option="keep") + daily_change = ranked.diff().abs().mean(axis=1) + return float(daily_change.mean()) + + +def compute_factor_correlation(factors: dict[str, pd.DataFrame]) -> pd.DataFrame: + """Cross-sectional IC correlation between all factor pairs.""" + names = list(factors.keys()) + n = len(names) + corr_matrix = pd.DataFrame(np.nan, index=names, columns=names) + + # Use time-series of rank-averaged signals + avg_ranks = {} + for name, sig in factors.items(): + ranked = sig.rank(axis=1, pct=True, na_option="keep") + avg_ranks[name] = ranked.mean(axis=1).dropna() + + for i in range(n): + for j in range(i, n): + s1 = avg_ranks[names[i]] + s2 = avg_ranks[names[j]] + common = s1.index.intersection(s2.index) + if len(common) > 100: + c = s1[common].corr(s2[common]) + corr_matrix.loc[names[i], names[j]] = c + corr_matrix.loc[names[j], names[i]] = c + if i == j: + corr_matrix.loc[names[i], names[j]] = 1.0 + + return corr_matrix + + +def analyze_factor( + name: str, + signal: pd.DataFrame, + prices: pd.DataFrame, + horizons: list[int] | None = None, +) -> dict: + """Full single-factor analysis.""" + if horizons is None: + horizons = HORIZONS + + results = {"name": name} + + # Forward returns at each horizon + for h in horizons: + fwd_ret = prices.pct_change(h).shift(-h) + ic_series = compute_ic(signal, fwd_ret) + + if len(ic_series) == 0: + results[f"ic_{h}d"] = np.nan + results[f"icir_{h}d"] = np.nan + continue + + ic_mean = ic_series.mean() + ic_std = ic_series.std() + icir = ic_mean / ic_std if ic_std > 0 else 0.0 + + results[f"ic_{h}d"] = ic_mean + results[f"icir_{h}d"] = icir + + if h == 1: + results["ic_1d_series"] = ic_series + + # Quintile analysis at 5-day horizon + fwd_5d = prices.pct_change(5).shift(-5) + quintiles = compute_quintile_returns(signal, fwd_5d) + if not quintiles.empty: + results["q5_return"] = float(quintiles.iloc[-1]) # top quintile + results["q1_return"] = float(quintiles.iloc[0]) # bottom quintile + results["long_short_ann"] = float(quintiles.iloc[-1] - quintiles.iloc[0]) + results["monotonicity"] = float(quintiles.corr(pd.Series(range(1, len(quintiles) + 1), index=quintiles.index))) + results["quintile_returns"] = quintiles + else: + results["q5_return"] = np.nan + results["q1_return"] = np.nan + results["long_short_ann"] = np.nan + results["monotonicity"] = np.nan + + # Turnover + results["turnover"] = compute_turnover(signal) + + # Sub-period IC stability (rolling 252-day IC mean) + if "ic_1d_series" in results and len(results["ic_1d_series"]) > 252: + rolling_ic = results["ic_1d_series"].rolling(252).mean().dropna() + results["ic_stability"] = float((rolling_ic > 0).mean()) # fraction of time IC > 0 + results["ic_worst_year"] = float(rolling_ic.min()) + results["ic_best_year"] = float(rolling_ic.max()) + else: + results["ic_stability"] = np.nan + results["ic_worst_year"] = np.nan + results["ic_best_year"] = np.nan + + return results + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def get_all_factors(prices: pd.DataFrame, market: str) -> dict[str, pd.DataFrame]: + """Build all candidate factor signals.""" + factors = {} + + # Known baselines + factors["momentum_12_1"] = factor_momentum_12_1(prices) + factors["recovery"] = factor_recovery(prices) + factors["inverse_vol"] = factor_inverse_vol(prices) + + # New candidates — universal + factors["short_term_reversal"] = factor_short_term_reversal(prices) + factors["idio_vol_change"] = factor_idio_vol_change(prices) + factors["return_acceleration"] = factor_volume_price_divergence(prices) + factors["drawdown_recovery"] = factor_max_drawdown_recovery(prices) + factors["neg_skewness"] = factor_skewness(prices) + factors["range_compression"] = factor_high_low_range(prices) + factors["mean_rev_zscore"] = factor_mean_reversion_residual(prices) + factors["up_down_vol_ratio"] = factor_up_down_vol_ratio(prices) + factors["win_streak"] = factor_consecutive_up_days(prices) + factors["smooth_momentum"] = factor_gap_momentum(prices) + factors["recovery_accel"] = factor_recovery_acceleration(prices) + factors["trend_r2"] = factor_trend_strength(prices) + factors["cheap_momentum"] = factor_relative_volume_momentum(prices) + factors["near_52w_high"] = factor_52w_high_distance(prices) + factors["low_downside_beta"] = factor_downside_beta_proxy(prices) + + # A-share specific (also test on US for comparison) + if market == "cn": + factors["illiquidity"] = factor_liquidity_premium(prices) + factors["anti_lottery"] = factor_lottery_demand(prices) + factors["vol_reversal"] = factor_turnover_reversal(prices) + factors["low_price"] = factor_price_level(prices) + + return factors + + +def print_summary_table(results: list[dict], market: str) -> None: + """Print a ranked summary of all factors.""" + rows = [] + for r in results: + rows.append({ + "Factor": r["name"], + "IC_1d": r.get("ic_1d", np.nan), + "ICIR_1d": r.get("icir_1d", np.nan), + "IC_5d": r.get("ic_5d", np.nan), + "ICIR_5d": r.get("icir_5d", np.nan), + "IC_20d": r.get("ic_20d", np.nan), + "ICIR_20d": r.get("icir_20d", np.nan), + "LS_5d_ann": r.get("long_short_ann", np.nan), + "Mono": r.get("monotonicity", np.nan), + "Turnover": r.get("turnover", np.nan), + "IC_stab": r.get("ic_stability", np.nan), + "IC_worst_yr": r.get("ic_worst_year", np.nan), + }) + + df = pd.DataFrame(rows).set_index("Factor") + df = df.sort_values("ICIR_5d", ascending=False) + + print(f"\n{'='*100}") + print(f" FACTOR RESEARCH RESULTS — {market.upper()} MARKET") + print(f"{'='*100}") + print("\nRanked by 5-day ICIR (most important metric for tradeable alpha):\n") + print(df.round(4).to_string()) + + # Highlight top factors + print(f"\n{'='*100}") + print(" TOP FACTORS (ICIR_5d > 0.05 and IC_stability > 0.6)") + print(f"{'='*100}") + top = df[(df["ICIR_5d"].abs() > 0.05) & (df["IC_stab"] > 0.6)] + if top.empty: + top = df.head(5) + print(" (No factor met strict threshold; showing top 5 by ICIR_5d)") + print(top.round(4).to_string()) + + # Quintile details for top factors + print(f"\n{'='*100}") + print(" QUINTILE RETURN PROFILES (annualized, 5-day forward)") + print(f"{'='*100}") + for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:8]: + qr = r.get("quintile_returns") + if qr is not None and not qr.empty: + q_str = " ".join(f"Q{int(k)}: {v:+.1%}" for k, v in qr.items()) + ls = r.get("long_short_ann", 0) + print(f" {r['name']:25s} | {q_str} | L/S: {ls:+.1%}") + + +def main(): + parser = argparse.ArgumentParser(description="Factor research") + parser.add_argument("--market", default="us", choices=["us", "cn"]) + parser.add_argument("--years", type=int, default=None, help="Limit to last N years") + args = parser.parse_args() + + market = args.market + config = UNIVERSES[market] + benchmark = config["benchmark"] + + print(f"Loading {market.upper()} price data...") + prices = data_manager.load(market) + + # Remove benchmark from stock universe + stocks = prices.drop(columns=[benchmark], errors="ignore") + + if args.years: + cutoff = stocks.index[-1] - pd.DateOffset(years=args.years) + stocks = stocks[stocks.index >= cutoff] + + print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} trading days") + print(f"Date range: {stocks.index[0].date()} to {stocks.index[-1].date()}") + + # Build all factor signals + print("\nComputing factor signals...") + factors = get_all_factors(stocks, market) + + # Analyze each factor + print("Running factor analysis (this may take a few minutes)...") + results = [] + for name, signal in factors.items(): + print(f" Analyzing: {name}...") + r = analyze_factor(name, signal, stocks) + results.append(r) + + # Print results + print_summary_table(results, market) + + # Factor correlation matrix + print(f"\n{'='*100}") + print(" FACTOR CORRELATION MATRIX (rank-averaged cross-sectional)") + print(f"{'='*100}") + corr = compute_factor_correlation(factors) + # Show only top factors + top_names = [r["name"] for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:10]] + top_names_present = [n for n in top_names if n in corr.index] + print(corr.loc[top_names_present, top_names_present].round(2).to_string()) + + +if __name__ == "__main__": + main() diff --git a/factor_robustness.py b/factor_robustness.py new file mode 100644 index 0000000..1562667 --- /dev/null +++ b/factor_robustness.py @@ -0,0 +1,323 @@ +""" +Robustness checks for winning factor strategies. + +Tests: +1. Rolling 2-year window performance (stability) +2. Top-N sensitivity (5, 10, 15, 20) +3. Rebalance frequency sensitivity (5d, 10d, 21d, 42d) +4. Transaction cost sensitivity (0, 10bps, 20bps, 50bps) +5. Drawdown analysis +""" + +from __future__ import annotations + +import argparse +import warnings + +import numpy as np +import pandas as pd + +import data_manager +from universe import UNIVERSES +from factor_real_backtest import ( + f_recovery_mom, + f_momentum_12_1, + f_recovery, + f_recovery_deep, + f_up_volume_proxy, + f_gap_up_freq, + f_earnings_drift_proxy, + f_reversal_vol_cn, + f_consistent_winner, + combo_signal, + make_strategy, + run_backtest, + compute_stats, +) + +warnings.filterwarnings("ignore") + + +def rolling_window_performance(equity: pd.Series, window_years: int = 2): + """Compute rolling window returns.""" + daily_ret = equity.pct_change().dropna() + window = 252 * window_years + results = [] + for end_idx in range(window, len(daily_ret), 63): # step 3 months + start_idx = end_idx - window + chunk = daily_ret.iloc[start_idx:end_idx] + total = (1 + chunk).prod() - 1 + ann = (1 + total) ** (252 / len(chunk)) - 1 + sharpe = chunk.mean() / chunk.std() * np.sqrt(252) if chunk.std() > 0 else 0 + results.append({ + "end_date": chunk.index[-1].date(), + "ann_return": ann, + "sharpe": sharpe, + }) + return pd.DataFrame(results) + + +def drawdown_analysis(equity: pd.Series) -> pd.DataFrame: + """Find top 5 drawdown episodes.""" + running_max = equity.cummax() + drawdown = (equity - running_max) / running_max + + # Find drawdown episodes + episodes = [] + in_dd = False + start = None + for i in range(len(drawdown)): + if drawdown.iloc[i] < -0.05 and not in_dd: + in_dd = True + start = i + elif drawdown.iloc[i] >= 0 and in_dd: + in_dd = False + trough_idx = drawdown.iloc[start:i].idxmin() + episodes.append({ + "start": drawdown.index[start].date(), + "trough": trough_idx.date(), + "end": drawdown.index[i].date(), + "depth": drawdown.loc[trough_idx], + "duration_days": i - start, + }) + # Handle ongoing drawdown + if in_dd: + trough_idx = drawdown.iloc[start:].idxmin() + episodes.append({ + "start": drawdown.index[start].date(), + "trough": trough_idx.date(), + "end": "ongoing", + "depth": drawdown.loc[trough_idx], + "duration_days": len(drawdown) - start, + }) + + df = pd.DataFrame(episodes) + if df.empty: + return df + return df.nsmallest(5, "depth") + + +def run_us(stocks: pd.DataFrame): + print("=" * 100) + print(" US ROBUSTNESS — Winner: momentum_12_1 + up_volume_proxy") + print("=" * 100) + + winner_func = combo_signal([(f_momentum_12_1, 0.5), (f_up_volume_proxy, 0.5)]) + baseline_func = f_recovery_mom + + # 1. Rolling 2-year performance + print("\n--- 1. Rolling 2-Year Performance ---\n") + for label, func in [("Winner: mom+upvol", winner_func), + ("Baseline: rec+mom", baseline_func)]: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + roll = rolling_window_performance(eq) + if roll.empty: + continue + win_pct = (roll["ann_return"] > 0).mean() + print(f" {label}:") + print(f" Mean 2yr ann return: {roll['ann_return'].mean():+.1%}") + print(f" Min 2yr ann return: {roll['ann_return'].min():+.1%}") + print(f" Max 2yr ann return: {roll['ann_return'].max():+.1%}") + print(f" % positive 2yr: {win_pct:.0%}") + print(f" Mean 2yr Sharpe: {roll['sharpe'].mean():.2f}") + print() + + # 2. Top-N sensitivity + print("--- 2. Top-N Sensitivity ---\n") + header = f" {'Top-N':<8}" + for label in ["Winner: mom+upvol", "Baseline: rec+mom"]: + header += f" | {'CAGR':>8} {'Sharpe':>8} {'MaxDD':>8}" + print(header) + print(" " + "-" * 70) + + for top_n in [5, 10, 15, 20, 30]: + line = f" {top_n:<8}" + for func in [winner_func, baseline_func]: + w = make_strategy(stocks, func, top_n=top_n) + eq = run_backtest(w, stocks) + s = compute_stats(eq, "") + line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f} {s['maxdd']:>+7.1%}" + print(line) + + # 3. Rebalance frequency sensitivity + print("\n--- 3. Rebalance Frequency Sensitivity ---\n") + header = f" {'Rebal':<8}" + for label in ["Winner: mom+upvol", "Baseline: rec+mom"]: + header += f" | {'CAGR':>8} {'Sharpe':>8} {'MaxDD':>8}" + print(header) + print(" " + "-" * 70) + + for rebal in [5, 10, 21, 42, 63]: + line = f" {rebal}d{'':<5}" + for func in [winner_func, baseline_func]: + w = make_strategy(stocks, func, top_n=10, rebal_freq=rebal) + eq = run_backtest(w, stocks) + s = compute_stats(eq, "") + line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f} {s['maxdd']:>+7.1%}" + print(line) + + # 4. Transaction cost sensitivity + print("\n--- 4. Transaction Cost Sensitivity ---\n") + header = f" {'Cost':<8}" + for label in ["Winner: mom+upvol", "Baseline: rec+mom"]: + header += f" | {'CAGR':>8} {'Sharpe':>8}" + print(header) + print(" " + "-" * 50) + + for cost in [0, 0.001, 0.002, 0.005]: + line = f" {cost*10000:.0f}bps{'':<4}" + for func in [winner_func, baseline_func]: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks, cost=cost) + s = compute_stats(eq, "") + line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f}" + print(line) + + # 5. Drawdown analysis + print("\n--- 5. Drawdown Episodes ---\n") + for label, func in [("Winner: mom+upvol", winner_func), + ("Baseline: rec+mom", baseline_func)]: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + dd = drawdown_analysis(eq) + print(f" {label}:") + if dd.empty: + print(" No significant drawdowns") + else: + for _, row in dd.iterrows(): + print(f" {row['start']} → {row['trough']} → {row['end']}: " + f"{row['depth']:+.1%} ({row['duration_days']}d)") + print() + + # 6. Also test the runner-up combos + print("--- 6. Other Strong Combos (Top-10, 21d rebal, 10bps) ---\n") + other_combos = [ + ("rec_deep+upvol", combo_signal([(f_recovery_deep, 0.5), (f_up_volume_proxy, 0.5)])), + ("rec_deep+mom", combo_signal([(f_recovery_deep, 0.5), (f_momentum_12_1, 0.5)])), + ("mom+gap_up", combo_signal([(f_momentum_12_1, 0.5), (f_gap_up_freq, 0.5)])), + ("rec_deep+upvol+mom", combo_signal([(f_recovery_deep, 0.33), (f_up_volume_proxy, 0.33), (f_momentum_12_1, 0.34)])), + ("mom+upvol+gap", combo_signal([(f_momentum_12_1, 0.33), (f_up_volume_proxy, 0.33), (f_gap_up_freq, 0.34)])), + ] + for label, func in other_combos: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + s = compute_stats(eq, "") + print(f" {label:<25} CAGR: {s['cagr']:>+7.1%} Sharpe: {s['sharpe']:.2f} MaxDD: {s['maxdd']:>+7.1%} Calmar: {s['calmar']:.2f}") + + +def run_cn(stocks: pd.DataFrame): + print("\n" + "=" * 100) + print(" CN ROBUSTNESS — Winners: reversal_vol + gap_up, earn_drift + reversal_vol") + print("=" * 100) + + winner1_func = combo_signal([(f_reversal_vol_cn, 0.5), (f_gap_up_freq, 0.5)]) + winner2_func = combo_signal([(f_earnings_drift_proxy, 0.5), (f_reversal_vol_cn, 0.5)]) + baseline_func = f_recovery_mom + + # 1. Rolling 2-year performance + print("\n--- 1. Rolling 2-Year Performance ---\n") + for label, func in [("W1: rev_vol+gap_up", winner1_func), + ("W2: earn_drift+rev_vol", winner2_func), + ("Baseline: rec+mom", baseline_func)]: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + roll = rolling_window_performance(eq) + if roll.empty: + continue + win_pct = (roll["ann_return"] > 0).mean() + print(f" {label}:") + print(f" Mean 2yr ann return: {roll['ann_return'].mean():+.1%}") + print(f" Min 2yr ann return: {roll['ann_return'].min():+.1%}") + print(f" Max 2yr ann return: {roll['ann_return'].max():+.1%}") + print(f" % positive 2yr: {win_pct:.0%}") + print(f" Mean 2yr Sharpe: {roll['sharpe'].mean():.2f}") + print() + + # 2. Top-N sensitivity + print("--- 2. Top-N Sensitivity ---\n") + header = f" {'Top-N':<8}" + for label in ["W1: rev+gap", "W2: earn+rev", "Baseline"]: + header += f" | {'CAGR':>8} {'Sharpe':>8} {'MaxDD':>8}" + print(header) + print(" " + "-" * 100) + + for top_n in [5, 10, 15, 20]: + line = f" {top_n:<8}" + for func in [winner1_func, winner2_func, baseline_func]: + w = make_strategy(stocks, func, top_n=top_n) + eq = run_backtest(w, stocks) + s = compute_stats(eq, "") + line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f} {s['maxdd']:>+7.1%}" + print(line) + + # 3. Rebalance frequency + print("\n--- 3. Rebalance Frequency ---\n") + header = f" {'Rebal':<8}" + for label in ["W1: rev+gap", "W2: earn+rev", "Baseline"]: + header += f" | {'CAGR':>8} {'Sharpe':>8}" + print(header) + print(" " + "-" * 75) + + for rebal in [5, 10, 21, 42]: + line = f" {rebal}d{'':<5}" + for func in [winner1_func, winner2_func, baseline_func]: + w = make_strategy(stocks, func, top_n=10, rebal_freq=rebal) + eq = run_backtest(w, stocks) + s = compute_stats(eq, "") + line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f}" + print(line) + + # 4. Transaction cost sensitivity + print("\n--- 4. Transaction Cost Sensitivity ---\n") + header = f" {'Cost':<8}" + for label in ["W1: rev+gap", "W2: earn+rev", "Baseline"]: + header += f" | {'CAGR':>8} {'Sharpe':>8}" + print(header) + print(" " + "-" * 75) + + for cost in [0, 0.001, 0.002, 0.005]: + line = f" {cost*10000:.0f}bps{'':<4}" + for func in [winner1_func, winner2_func, baseline_func]: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks, cost=cost) + s = compute_stats(eq, "") + line += f" | {s['cagr']:>+7.1%} {s['sharpe']:>8.2f}" + print(line) + + # 5. Drawdown analysis + print("\n--- 5. Drawdown Episodes ---\n") + for label, func in [("W1: rev_vol+gap_up", winner1_func), + ("W2: earn_drift+rev_vol", winner2_func), + ("Baseline: rec+mom", baseline_func)]: + w = make_strategy(stocks, func, top_n=10) + eq = run_backtest(w, stocks) + dd = drawdown_analysis(eq) + print(f" {label}:") + if dd.empty: + print(" No significant drawdowns") + else: + for _, row in dd.iterrows(): + print(f" {row['start']} → {row['trough']} → {row['end']}: " + f"{row['depth']:+.1%} ({row['duration_days']}d)") + print() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--market", default="both", choices=["us", "cn", "both"]) + args = parser.parse_args() + + if args.market in ("us", "both"): + prices = data_manager.load("us") + stocks = prices.drop(columns=["SPY"], errors="ignore") + run_us(stocks) + + if args.market in ("cn", "both"): + prices = data_manager.load("cn") + stocks = prices.drop(columns=["000300.SS"], errors="ignore") + run_cn(stocks) + + +if __name__ == "__main__": + main() diff --git a/factor_yearly_fresh.py b/factor_yearly_fresh.py new file mode 100644 index 0000000..1034845 --- /dev/null +++ b/factor_yearly_fresh.py @@ -0,0 +1,259 @@ +""" +Rebalancing frequency comparison: daily (1d) vs weekly (5d) vs biweekly (10d) vs monthly (21d). +Shows yearly returns and max drawdown for each frequency, for all champion strategies. +""" + +from __future__ import annotations +import warnings +import numpy as np +import pandas as pd +import data_manager +from factor_loop import ( + strat, bt, stats, combo, + f_rec_mom, f_rec_126, f_rec_63, + f_mom_12_1, f_mom_6_1, f_mom_intermediate, + f_above_ma200, f_golden_cross, + f_up_volume_proxy, f_gap_up_freq, + f_rec_mom_filtered, f_down_resilience, + f_up_capture, f_52w_high, f_str_10d, + f_earnings_drift, f_reversal_vol, +) + +warnings.filterwarnings("ignore") + +INITIAL = 10_000 + +REBAL_CONFIGS = [ + ("daily", 1), + ("weekly", 5), + ("biweekly", 10), + ("monthly", 21), +] + + +def f_quality_mom(p): + mom = f_mom_12_1(p) + consist = (p.pct_change() > 0).astype(float).rolling(252, min_periods=126).mean() + mom_r = mom.rank(axis=1, pct=True, na_option="keep") + con_r = consist.rank(axis=1, pct=True, na_option="keep") + up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r + + +def f_mom_x_gap(p): + return (f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") * + f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep")) + + +def run_equity(func, prices, rebal=21, cost=0.001): + w = strat(prices, func, top_n=10, rebal=rebal) + eq = bt(w, prices, cost=cost) + return eq / eq.iloc[0] * INITIAL + + +def year_returns(eq: pd.Series) -> dict[int, float]: + dr = eq.pct_change().fillna(0) + return {y: float((1 + dr[dr.index.year == y]).prod() - 1) + for y in sorted(dr.index.year.unique())} + + +def max_drawdown(eq: pd.Series) -> float: + rm = eq.cummax() + dd = (eq - rm) / rm + return float(dd.min()) + + +def max_drawdown_yearly(eq: pd.Series) -> dict[int, float]: + result = {} + for y in sorted(eq.index.year.unique()): + chunk = eq[eq.index.year == y] + if len(chunk) < 5: + continue + rm = chunk.cummax() + dd = (chunk - rm) / rm + result[y] = float(dd.min()) + return result + + +def cagr(eq: pd.Series) -> float: + dr = eq.pct_change().dropna() + if len(dr) < 100: + return np.nan + ny = len(dr) / 252 + tot = eq.iloc[-1] / eq.iloc[0] - 1 + return (1 + tot) ** (1 / ny) - 1 + + +def sharpe(eq: pd.Series) -> float: + dr = eq.pct_change().dropna() + if len(dr) < 100 or dr.std() == 0: + return np.nan + return float(dr.mean() / dr.std() * np.sqrt(252)) + + +def turnover_annual(func, prices, rebal): + """Estimate annualised turnover (one-way).""" + w = strat(prices, func, top_n=10, rebal=rebal) + daily_turn = w.diff().abs().sum(axis=1).mean() + return daily_turn * 252 + + +def print_by_year(strat_defs, prices, bench_eq, bench_label, market_label, years): + """For each year, print a table: strategies as rows, rebal frequencies as columns.""" + + freq_labels = [r for r, _ in REBAL_CONFIGS] + + # Pre-compute all equities and returns + all_eqs = {} # {(sname, freq): equity} + for sname, func in strat_defs.items(): + for rlabel, rdays in REBAL_CONFIGS: + all_eqs[(sname, rlabel)] = run_equity(func, prices, rebal=rdays) + + all_rets = {} # {(sname, freq): {year: ret}} + for key, eq in all_eqs.items(): + all_rets[key] = year_returns(eq) + + bench_rets = year_returns(bench_eq) + snames = list(strat_defs.keys()) + name_w = max(len(s) for s in snames) + 1 + + for year in years: + line_w = name_w + 4 + 20 * (len(freq_labels) + 1) + print(f"\n{'=' * line_w}") + print(f" {market_label} — {year} (fresh $10,000)") + print(f"{'=' * line_w}") + + # Header + print(f" {'Strategy':<{name_w}}", end="") + for f in freq_labels: + print(f" {f:>18}", end="") + print(f" {bench_label:>18}") + print(f" {'-'*name_w}", end="") + for _ in range(len(freq_labels) + 1): + print(f" {'-'*18}", end="") + print() + + for sname in snames: + print(f" {sname:<{name_w}}", end="") + + # Find best freq for this strategy this year + freq_vals = {} + for f in freq_labels: + r = all_rets[(sname, f)].get(year) + if r is not None and abs(r) > 0.0005: + freq_vals[f] = r + + best_f = max(freq_vals, key=freq_vals.get) if freq_vals else None + + for f in freq_labels: + r = all_rets[(sname, f)].get(year) + if r is not None and abs(r) > 0.0005: + v = INITIAL * (1 + r) + marker = " ★" if f == best_f else " " + print(f" ${v:>9,.0f} {r:>+5.0%}{marker}", end="") + else: + print(f" {'—':>18}", end="") + + # Benchmark (same for all strategies) + br = bench_rets.get(year) + if br is not None and abs(br) > 0.0005: + print(f" ${INITIAL*(1+br):>9,.0f} {br:>+5.0%} ", end="") + else: + print(f" {'—':>18}", end="") + print() + + # Best strategy per freq + print(f" {'-'*name_w}", end="") + for _ in range(len(freq_labels) + 1): + print(f" {'-'*18}", end="") + print() + + print(f" {'BEST':<{name_w}}", end="") + for f in freq_labels: + best_r = -999 + best_s = "" + for sname in snames: + r = all_rets[(sname, f)].get(year) + if r is not None and abs(r) > 0.0005 and r > best_r: + best_r = r + best_s = sname + if best_r > -999: + print(f" ${INITIAL*(1+best_r):>9,.0f} {best_r:>+5.0%} ", end="") + else: + print(f" {'—':>18}", end="") + # bench + br = bench_rets.get(year) + if br is not None and abs(br) > 0.0005: + print(f" ${INITIAL*(1+br):>9,.0f} {br:>+5.0%} ", end="") + else: + print(f" {'—':>18}", end="") + print() + + +def main(): + years = list(range(2015, 2027)) + + # ===== US ===== + print(f"\n{'#'*130}") + print(f"{'#'*50} US MARKET {'#'*50}") + print(f"{'#'*130}") + + prices_us = data_manager.load("us") + bench_us = prices_us["SPY"].dropna() + stocks_us = prices_us.drop(columns=["SPY"], errors="ignore") + eq_spy = bench_us / bench_us.iloc[0] * INITIAL + + us_strats = { + "rec_mfilt+deep×upvol": combo([ + (f_rec_mom_filtered, 0.5), + (combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), 0.5), + ]), + "ma200+mom7m+rec126": combo([ + (f_above_ma200, 0.33), (f_mom_intermediate, 0.33), (f_rec_126, 0.34) + ]), + "rec_mfilt+ma200": combo([ + (f_rec_mom_filtered, 0.5), (f_above_ma200, 0.5) + ]), + "mom7m+rec126": combo([ + (f_mom_intermediate, 0.5), (f_rec_126, 0.5) + ]), + "BASELINE:rec+mom": f_rec_mom, + } + + print_by_year(us_strats, stocks_us, eq_spy, "SPY", "US", years) + + # ===== CN ===== + print(f"\n\n{'#'*130}") + print(f"{'#'*50} CN MARKET {'#'*50}") + print(f"{'#'*130}") + + prices_cn = data_manager.load("cn") + bench_cn = prices_cn["000300.SS"].dropna() if "000300.SS" in prices_cn.columns else None + stocks_cn = prices_cn.drop(columns=["000300.SS"], errors="ignore") + + cn_strats = { + "up_cap+quality_mom": combo([ + (f_up_capture, 0.5), (f_quality_mom, 0.5) + ]), + "down_resil+qual_mom": combo([ + (f_down_resilience, 0.5), (f_quality_mom, 0.5) + ]), + "rec63+mom×gap": combo([ + (f_rec_63, 0.5), (f_mom_x_gap, 0.5) + ]), + "up_cap+mom×gap": combo([ + (f_up_capture, 0.5), (f_mom_x_gap, 0.5) + ]), + "BASELINE:rec+mom": f_rec_mom, + } + + if bench_cn is not None: + eq_csi = bench_cn / bench_cn.iloc[0] * INITIAL + else: + eq_csi = pd.Series(dtype=float) + + print_by_year(cn_strats, stocks_cn, eq_csi, "CSI300", "CN", years) + + +if __name__ == "__main__": + main() diff --git a/factor_yearly_report.py b/factor_yearly_report.py new file mode 100644 index 0000000..d74f114 --- /dev/null +++ b/factor_yearly_report.py @@ -0,0 +1,219 @@ +""" +Yearly ROI report for champion strategies vs SPY, starting from $10,000. +""" + +from __future__ import annotations +import warnings +import numpy as np +import pandas as pd +import data_manager +from universe import UNIVERSES +from factor_loop import ( + strat, bt, stats, combo, + f_rec_mom, f_rec_126, f_rec_63, + f_mom_12_1, f_mom_6_1, f_mom_intermediate, + f_above_ma200, f_golden_cross, + f_up_volume_proxy, f_gap_up_freq, + f_rec_mom_filtered, f_down_resilience, + f_up_capture, f_52w_high, f_str_10d, + f_earnings_drift, f_reversal_vol, +) + +warnings.filterwarnings("ignore") + +INITIAL = 10_000 + + +def f_quality_mom(p): + mom = f_mom_12_1(p) + consist = (p.pct_change() > 0).astype(float).rolling(252, min_periods=126).mean() + mom_r = mom.rank(axis=1, pct=True, na_option="keep") + con_r = consist.rank(axis=1, pct=True, na_option="keep") + up_r = f_up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r + + +def f_mom_x_gap(p): + return (f_mom_12_1(p).rank(axis=1, pct=True, na_option="keep") * + f_gap_up_freq(p).rank(axis=1, pct=True, na_option="keep")) + + +def run_equity(func, prices, cost=0.001): + w = strat(prices, func, top_n=10) + eq = bt(w, prices, cost=cost) + return eq / eq.iloc[0] * INITIAL + + +def yearly_table(equities: dict[str, pd.Series], title: str): + print(f"\n{'='*130}") + print(f" {title}") + print(f" Starting capital: ${INITIAL:,.0f}") + print(f"{'='*130}") + + names = list(equities.keys()) + all_years = sorted(set(y for eq in equities.values() for y in eq.index.year.unique())) + + # Header + print(f"\n {'Year':<6}", end="") + for n in names: + print(f" | {n[:24]:>24}", end="") + print() + print(f" {'-'*6}", end="") + for _ in names: + print(f"-+-{'-'*24}", end="") + print() + + # Track portfolio values + year_end_vals = {n: INITIAL for n in names} + + for year in all_years: + print(f" {year:<6}", end="") + for n in names: + eq = equities[n] + yr_data = eq[eq.index.year == year] + if len(yr_data) < 2: + print(f" | {'—':>24}", end="") + continue + start_val = yr_data.iloc[0] + end_val = yr_data.iloc[-1] + ret = end_val / start_val - 1 + year_end_vals[n] = end_val + # Show both return % and portfolio value + print(f" | {ret:>+7.1%} ${end_val:>12,.0f}", end="") + print() + + # Summary rows + print(f" {'-'*6}", end="") + for _ in names: + print(f"-+-{'-'*24}", end="") + print() + + # Total return + print(f" {'Total':<6}", end="") + for n in names: + eq = equities[n] + total = eq.iloc[-1] / INITIAL - 1 + print(f" | {total:>+7.0%} ${eq.iloc[-1]:>12,.0f}", end="") + print() + + # CAGR + print(f" {'CAGR':<6}", end="") + for n in names: + eq = equities[n] + ny = len(eq) / 252 + total = eq.iloc[-1] / INITIAL - 1 + cagr = (1 + total) ** (1 / ny) - 1 + print(f" | {cagr:>+7.1%} {'':>12}", end="") + print() + + # Sharpe + print(f" {'Sharpe':<6}", end="") + for n in names: + eq = equities[n] + dr = eq.pct_change().dropna() + sh = dr.mean() / dr.std() * np.sqrt(252) if dr.std() > 0 else 0 + print(f" | {sh:>7.2f} {'':>12}", end="") + print() + + # Max DD + print(f" {'MaxDD':<6}", end="") + for n in names: + eq = equities[n] + rm = eq.cummax() + dd = ((eq - rm) / rm).min() + print(f" | {dd:>+7.1%} {'':>12}", end="") + print() + + # Best/Worst year + print(f" {'Best':<6}", end="") + for n in names: + eq = equities[n] + dr = eq.pct_change().fillna(0) + yr_rets = {y: float((1 + dr[dr.index.year == y]).prod() - 1) for y in all_years} + # skip warmup year + active = {y: r for y, r in yr_rets.items() if abs(r) > 0.001} + if active: + best_y = max(active, key=active.get) + print(f" | {active[best_y]:>+7.1%} ({best_y}) ", end="") + else: + print(f" | {'—':>24}", end="") + print() + + print(f" {'Worst':<6}", end="") + for n in names: + eq = equities[n] + dr = eq.pct_change().fillna(0) + yr_rets = {y: float((1 + dr[dr.index.year == y]).prod() - 1) for y in all_years} + active = {y: r for y, r in yr_rets.items() if abs(r) > 0.001} + if active: + worst_y = min(active, key=active.get) + print(f" | {active[worst_y]:>+7.1%} ({worst_y}) ", end="") + else: + print(f" | {'—':>24}", end="") + print() + + +def main(): + # ===== US ===== + prices_us = data_manager.load("us") + bench_us = prices_us["SPY"].dropna() + stocks_us = prices_us.drop(columns=["SPY"], errors="ignore") + + eq_spy = bench_us / bench_us.iloc[0] * INITIAL + + us_strats = { + "rec_mfilt+deep×upvol": combo([ + (f_rec_mom_filtered, 0.5), + (combo([(f_rec_126, 0.5), (f_up_volume_proxy, 0.5)]), 0.5), + ]), + "ma200+mom7m+rec126": combo([ + (f_above_ma200, 0.33), (f_mom_intermediate, 0.33), (f_rec_126, 0.34) + ]), + "rec_mfilt+ma200": combo([ + (f_rec_mom_filtered, 0.5), (f_above_ma200, 0.5) + ]), + "mom7m+rec126": combo([ + (f_mom_intermediate, 0.5), (f_rec_126, 0.5) + ]), + "BASELINE:rec+mom": f_rec_mom, + } + + us_equities = {} + for name, func in us_strats.items(): + us_equities[name] = run_equity(func, stocks_us) + us_equities["SPY (Benchmark)"] = eq_spy + + yearly_table(us_equities, "US MARKET — Champion Strategies vs SPY — $10,000 Starting Capital") + + # ===== CN ===== + prices_cn = data_manager.load("cn") + bench_cn = prices_cn["000300.SS"].dropna() if "000300.SS" in prices_cn.columns else None + stocks_cn = prices_cn.drop(columns=["000300.SS"], errors="ignore") + + cn_strats = { + "up_cap+quality_mom": combo([ + (f_up_capture, 0.5), (f_quality_mom, 0.5) + ]), + "down_resil+qual_mom": combo([ + (f_down_resilience, 0.5), (f_quality_mom, 0.5) + ]), + "rec63+mom×gap": combo([ + (f_rec_63, 0.5), (f_mom_x_gap, 0.5) + ]), + "up_cap+mom×gap": combo([ + (f_up_capture, 0.5), (f_mom_x_gap, 0.5) + ]), + "BASELINE:rec+mom": f_rec_mom, + } + + cn_equities = {} + for name, func in cn_strats.items(): + cn_equities[name] = run_equity(func, stocks_cn) + if bench_cn is not None: + cn_equities["CSI300 (Benchmark)"] = bench_cn / bench_cn.iloc[0] * INITIAL + + yearly_table(cn_equities, "CN MARKET — Champion Strategies vs CSI 300 — $10,000 Starting Capital") + + +if __name__ == "__main__": + main() diff --git a/strategies/factor_combo.py b/strategies/factor_combo.py new file mode 100644 index 0000000..c36cdc5 --- /dev/null +++ b/strategies/factor_combo.py @@ -0,0 +1,218 @@ +""" +Factor combination strategies discovered through iterative factor research. + +US champions: + - rec_mfilt+deep×upvol: Recovery (momentum-filtered) + deep recovery × up-volume + - ma200+mom7m+rec126: Above MA200 + intermediate momentum + deep recovery + - rec_mfilt+ma200: Recovery (momentum-filtered) + above MA200 + - mom7m+rec126: Intermediate momentum + deep recovery + +CN champions: + - up_cap+quality_mom: Up-capture ratio + quality momentum composite + - down_resil+qual_mom: Down-resilience + quality momentum composite + - rec63+mom×gap: Recovery 63d + momentum × gap-up frequency + - up_cap+mom×gap: Up-capture + momentum × gap-up frequency + +Each can run at daily/weekly/biweekly/monthly rebalancing frequency. +""" + +import numpy as np +import pandas as pd +from strategies.base import Strategy + + +# --------------------------------------------------------------------------- +# Factor building blocks +# --------------------------------------------------------------------------- + +def _mom_12_1(p): + return p.shift(21).pct_change(231) + + +def _mom_intermediate(p): + return p.shift(21).pct_change(147) + + +def _rec_63(p): + return p / p.rolling(63, min_periods=63).min() - 1 + + +def _rec_126(p): + return p / p.rolling(126, min_periods=126).min() - 1 + + +def _above_ma200(p): + return p / p.rolling(200, min_periods=200).mean() - 1 + + +def _up_volume_proxy(p): + ret = p.pct_change() + return ret.where(ret > 0, 0).rolling(20, min_periods=15).sum() + + +def _gap_up_freq(p): + ret = p.pct_change() + return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean() + + +def _consistent_returns(p): + ret = p.pct_change() + return (ret > 0).astype(float).rolling(252, min_periods=126).mean() + + +def _rec_mom_filtered(p): + rec = p / p.rolling(126, min_periods=126).min() - 1 + mom = p.shift(21).pct_change(105) + return rec.where(mom > 0, np.nan) + + +def _up_capture(p): + ret = p.pct_change() + mkt = ret.mean(axis=1) + up_mkt = mkt > 0 + arr = ret.values.copy() + arr[~up_mkt.values, :] = np.nan + stock_up = pd.DataFrame(arr, index=ret.index, columns=ret.columns) + mkt_up_vals = mkt.where(up_mkt, np.nan) + stock_avg = stock_up.rolling(60, min_periods=20).mean() + mkt_avg = mkt_up_vals.rolling(60, min_periods=20).mean() + return stock_avg.div(mkt_avg, axis=0) + + +def _down_resilience(p): + ret = p.pct_change() + mkt = ret.mean(axis=1) + down_mkt = mkt < 0 + arr = ret.values.copy() + arr[~down_mkt.values, :] = np.nan + down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns) + return -down_ret.rolling(120, min_periods=30).mean() + + +def _quality_mom(p): + mom_r = _mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + con_r = _consistent_returns(p).rank(axis=1, pct=True, na_option="keep") + up_r = _up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") + return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r + + +def _mom_x_gap(p): + mom_r = _mom_12_1(p).rank(axis=1, pct=True, na_option="keep") + gap_r = _gap_up_freq(p).rank(axis=1, pct=True, na_option="keep") + return mom_r * gap_r + + +# --------------------------------------------------------------------------- +# Combo signal constructors (weighted rank sums) +# --------------------------------------------------------------------------- + +def _rank(df): + return df.rank(axis=1, pct=True, na_option="keep") + + +# US combos +def signal_rec_mfilt_deep_upvol(p): + rec_mfilt_r = _rank(_rec_mom_filtered(p)) + deep_upvol_r = _rank(_rec_126(p)) * _rank(_up_volume_proxy(p)) + deep_upvol_rr = _rank(deep_upvol_r) + return 0.5 * rec_mfilt_r + 0.5 * deep_upvol_rr + + +def signal_ma200_mom7m_rec126(p): + return (0.33 * _rank(_above_ma200(p)) + + 0.33 * _rank(_mom_intermediate(p)) + + 0.34 * _rank(_rec_126(p))) + + +def signal_rec_mfilt_ma200(p): + return 0.5 * _rank(_rec_mom_filtered(p)) + 0.5 * _rank(_above_ma200(p)) + + +def signal_mom7m_rec126(p): + return 0.5 * _rank(_mom_intermediate(p)) + 0.5 * _rank(_rec_126(p)) + + +# CN combos +def signal_up_cap_quality_mom(p): + return 0.5 * _rank(_up_capture(p)) + 0.5 * _rank(_quality_mom(p)) + + +def signal_down_resil_qual_mom(p): + return 0.5 * _rank(_down_resilience(p)) + 0.5 * _rank(_quality_mom(p)) + + +def signal_rec63_mom_gap(p): + return 0.5 * _rank(_rec_63(p)) + 0.5 * _rank(_mom_x_gap(p)) + + +def signal_up_cap_mom_gap(p): + return 0.5 * _rank(_up_capture(p)) + 0.5 * _rank(_mom_x_gap(p)) + + +# --------------------------------------------------------------------------- +# Signal registry: name -> callable(prices) -> DataFrame +# --------------------------------------------------------------------------- + +SIGNAL_REGISTRY = { + # US + "rec_mfilt+deep_upvol": signal_rec_mfilt_deep_upvol, + "ma200+mom7m+rec126": signal_ma200_mom7m_rec126, + "rec_mfilt+ma200": signal_rec_mfilt_ma200, + "mom7m+rec126": signal_mom7m_rec126, + # CN + "up_cap+quality_mom": signal_up_cap_quality_mom, + "down_resil+qual_mom": signal_down_resil_qual_mom, + "rec63+mom_gap": signal_rec63_mom_gap, + "up_cap+mom_gap": signal_up_cap_mom_gap, +} + + +# --------------------------------------------------------------------------- +# Strategy class +# --------------------------------------------------------------------------- + +class FactorComboStrategy(Strategy): + """ + Generic factor-combination strategy with configurable rebalancing frequency. + + Parameters: + signal_name: key into SIGNAL_REGISTRY + rebal_freq: rebalancing interval in trading days (1=daily, 5=weekly, 10=biweekly, 21=monthly) + top_n: number of stocks to hold + """ + + REBAL_LABELS = {1: "daily", 5: "weekly", 10: "biweekly", 21: "monthly"} + + def __init__(self, signal_name: str, rebal_freq: int = 21, top_n: int = 10): + if signal_name not in SIGNAL_REGISTRY: + raise ValueError(f"Unknown signal: {signal_name}. " + f"Available: {list(SIGNAL_REGISTRY.keys())}") + self.signal_name = signal_name + self.signal_func = SIGNAL_REGISTRY[signal_name] + self.rebal_freq = rebal_freq + self.top_n = top_n + + def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: + sig = self.signal_func(data) + + # Select top_n by signal rank + rank = sig.rank(axis=1, ascending=False, na_option="bottom") + n_valid = sig.notna().sum(axis=1) + enough = n_valid >= self.top_n + top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1) + + raw = top_mask.astype(float) + row_sums = raw.sum(axis=1).replace(0, np.nan) + signals = raw.div(row_sums, axis=0).fillna(0.0) + + # Rebalance at configured frequency + warmup = 252 + rebal_mask = pd.Series(False, index=data.index) + rebal_indices = list(range(warmup, len(data), self.rebal_freq)) + rebal_mask.iloc[rebal_indices] = True + + signals[~rebal_mask] = np.nan + signals = signals.ffill().fillna(0.0) + signals.iloc[:warmup] = 0.0 + + return signals.shift(1).fillna(0.0) diff --git a/trader.py b/trader.py index 1df7f16..f20959a 100644 --- a/trader.py +++ b/trader.py @@ -40,6 +40,7 @@ import yfinance as yf import data_manager from strategies.buy_and_hold import BuyAndHoldStrategy from strategies.dual_momentum import DualMomentumStrategy +from strategies.factor_combo import FactorComboStrategy from strategies.inverse_vol import InverseVolatilityStrategy from strategies.momentum import MomentumStrategy from strategies.momentum_quality import MomentumQualityStrategy @@ -52,6 +53,7 @@ from universe import UNIVERSES # --------------------------------------------------------------------------- STRATEGY_REGISTRY = { + # --- Original strategies --- "recovery_mom_top10": lambda **kw: RecoveryMomentumStrategy(top_n=10), "recovery_mom_top20": lambda **kw: RecoveryMomentumStrategy(top_n=20), "recovery_mom_top50": lambda **kw: RecoveryMomentumStrategy(top_n=50), @@ -61,6 +63,40 @@ STRATEGY_REGISTRY = { "inverse_vol": lambda **kw: InverseVolatilityStrategy(vol_window=20), "trend_following": lambda **kw: TrendFollowingStrategy(top_n=kw.get("top_n", 20)), "buy_and_hold": lambda **kw: BuyAndHoldStrategy(), + # --- Factor combo: US champions --- + "fc_rec_mfilt_deep_upvol_daily": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=1), + "fc_rec_mfilt_deep_upvol_weekly": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=5), + "fc_rec_mfilt_deep_upvol_biweekly": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=10), + "fc_rec_mfilt_deep_upvol_monthly": lambda **kw: FactorComboStrategy("rec_mfilt+deep_upvol", rebal_freq=21), + "fc_ma200_mom7m_rec126_daily": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=1), + "fc_ma200_mom7m_rec126_weekly": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=5), + "fc_ma200_mom7m_rec126_biweekly": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=10), + "fc_ma200_mom7m_rec126_monthly": lambda **kw: FactorComboStrategy("ma200+mom7m+rec126", rebal_freq=21), + "fc_rec_mfilt_ma200_daily": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=1), + "fc_rec_mfilt_ma200_weekly": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=5), + "fc_rec_mfilt_ma200_biweekly": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=10), + "fc_rec_mfilt_ma200_monthly": lambda **kw: FactorComboStrategy("rec_mfilt+ma200", rebal_freq=21), + "fc_mom7m_rec126_daily": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=1), + "fc_mom7m_rec126_weekly": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=5), + "fc_mom7m_rec126_biweekly": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=10), + "fc_mom7m_rec126_monthly": lambda **kw: FactorComboStrategy("mom7m+rec126", rebal_freq=21), + # --- Factor combo: CN champions --- + "fc_up_cap_quality_mom_daily": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=1), + "fc_up_cap_quality_mom_weekly": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=5), + "fc_up_cap_quality_mom_biweekly": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=10), + "fc_up_cap_quality_mom_monthly": lambda **kw: FactorComboStrategy("up_cap+quality_mom", rebal_freq=21), + "fc_down_resil_qual_mom_daily": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=1), + "fc_down_resil_qual_mom_weekly": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=5), + "fc_down_resil_qual_mom_biweekly": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=10), + "fc_down_resil_qual_mom_monthly": lambda **kw: FactorComboStrategy("down_resil+qual_mom", rebal_freq=21), + "fc_rec63_mom_gap_daily": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=1), + "fc_rec63_mom_gap_weekly": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=5), + "fc_rec63_mom_gap_biweekly": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=10), + "fc_rec63_mom_gap_monthly": lambda **kw: FactorComboStrategy("rec63+mom_gap", rebal_freq=21), + "fc_up_cap_mom_gap_daily": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=1), + "fc_up_cap_mom_gap_weekly": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=5), + "fc_up_cap_mom_gap_biweekly": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=10), + "fc_up_cap_mom_gap_monthly": lambda **kw: FactorComboStrategy("up_cap+mom_gap", rebal_freq=21), }