""" Backtest best factor combinations with yearly return breakdown. US best: momentum + recovery + low_downside_beta + short_term_reversal CN best: momentum + anti_lottery + vol_reversal """ from __future__ import annotations import argparse import numpy as np import pandas as pd import data_manager import metrics from universe import UNIVERSES from factor_research import ( factor_momentum_12_1, factor_recovery, factor_short_term_reversal, factor_downside_beta_proxy, factor_lottery_demand, factor_turnover_reversal, factor_52w_high_distance, ) def build_strategy_signals( prices: pd.DataFrame, factor_funcs: list, weights: list[float], top_n: int = 10, rebal_freq: int = 21, ) -> pd.DataFrame: """Build equal-weight top-N strategy from ranked factor combination.""" signals_list = [f(prices) for f in factor_funcs] ranked = [s.rank(axis=1, pct=True, na_option="keep") for s in signals_list] composite = sum(w * r for w, r in zip(weights, ranked)) # Warmup: need at least 252 days warmup = 252 rank = composite.rank(axis=1, ascending=False, na_option="bottom") n_valid = composite.notna().sum(axis=1) enough = n_valid >= top_n top_mask = (rank <= top_n) & enough.values.reshape(-1, 1) raw = top_mask.astype(float) row_sums = raw.sum(axis=1).replace(0, np.nan) signals = raw.div(row_sums, axis=0).fillna(0.0) # Monthly rebalance rebal_mask = pd.Series(False, index=prices.index) rebal_indices = list(range(warmup, len(prices), rebal_freq)) rebal_mask.iloc[rebal_indices] = True signals[~rebal_mask] = np.nan signals = signals.ffill().fillna(0.0) signals.iloc[:warmup] = 0.0 return signals.shift(1).fillna(0.0) def backtest_equity(signals: pd.DataFrame, prices: pd.DataFrame, cost: float = 0.001) -> pd.Series: """Simple vectorized backtest returning equity curve.""" returns = prices.pct_change().fillna(0.0) port_ret = (signals * returns).sum(axis=1) # Transaction costs via turnover turnover = signals.diff().abs().sum(axis=1) port_ret -= turnover * cost equity = (1 + port_ret).cumprod() * 100000 return equity def yearly_returns(equity: pd.Series) -> pd.DataFrame: """Compute calendar year returns from equity curve.""" daily_ret = equity.pct_change().fillna(0) years = daily_ret.index.year rows = [] for year in sorted(years.unique()): mask = years == year yr_ret = (1 + daily_ret[mask]).prod() - 1 # Also compute max drawdown for the year eq_yr = equity[mask] running_max = eq_yr.cummax() dd = (eq_yr - running_max) / running_max rows.append({ "year": year, "return": yr_ret, "max_dd": dd.min(), "start_val": float(eq_yr.iloc[0]), "end_val": float(eq_yr.iloc[-1]), }) return pd.DataFrame(rows).set_index("year") def run(market: str, years_list: list[int]): config = UNIVERSES[market] benchmark = config["benchmark"] print(f"Loading {market.upper()} price data...") prices = data_manager.load(market) bench_prices = prices[benchmark] if benchmark in prices.columns else None stocks = prices.drop(columns=[benchmark], errors="ignore") if market == "us": label = "Mom+Recovery+LowDBeta+STR" factor_funcs = [factor_momentum_12_1, factor_recovery, factor_downside_beta_proxy, factor_short_term_reversal] weights = [0.25, 0.25, 0.25, 0.25] baseline_label = "Recovery+Mom (baseline)" baseline_funcs = [factor_momentum_12_1, factor_recovery] baseline_weights = [0.5, 0.5] else: label = "Mom+Near52wHigh+VolReversal" factor_funcs = [factor_momentum_12_1, factor_52w_high_distance, factor_turnover_reversal] weights = [0.40, 0.30, 0.30] baseline_label = "Mom+Recovery (baseline)" baseline_funcs = [factor_momentum_12_1, factor_recovery] baseline_weights = [0.5, 0.5] for top_n in [10]: print(f"\n{'='*90}") print(f" {market.upper()} — Top {top_n} — {label}") print(f"{'='*90}") # Best combo sig = build_strategy_signals(stocks, factor_funcs, weights, top_n=top_n) eq = backtest_equity(sig, stocks) # Baseline sig_base = build_strategy_signals(stocks, baseline_funcs, baseline_weights, top_n=top_n) eq_base = backtest_equity(sig_base, stocks) # Benchmark if bench_prices is not None: bp = bench_prices.dropna() eq_bench = bp / bp.iloc[0] * 100000 for n_years in years_list: cutoff = stocks.index[-1] - pd.DateOffset(years=n_years) eq_slice = eq[eq.index >= cutoff] eq_base_slice = eq_base[eq_base.index >= cutoff] if len(eq_slice) < 50: continue # Normalize to starting capital eq_norm = eq_slice / eq_slice.iloc[0] * 100000 eq_base_norm = eq_base_slice / eq_base_slice.iloc[0] * 100000 yr = yearly_returns(eq_norm) yr_base = yearly_returns(eq_base_norm) if bench_prices is not None: eq_bench_slice = eq_bench[eq_bench.index >= cutoff] eq_bench_norm = eq_bench_slice / eq_bench_slice.iloc[0] * 100000 yr_bench = yearly_returns(eq_bench_norm) print(f"\n--- Last {n_years} Years (from {eq_slice.index[0].date()}) ---\n") # Combined table print(f" {'Year':<6} | {label:>30} | {baseline_label:>25} | {'Benchmark':>12} | {'Alpha vs Bench':>14}") print(f" {'-'*6}-+-{'-'*30}-+-{'-'*25}-+-{'-'*12}-+-{'-'*14}") all_years = sorted(yr.index.tolist()) total_new = 1.0 total_base = 1.0 total_bench = 1.0 for y in all_years: r_new = yr.loc[y, "return"] if y in yr.index else 0 dd_new = yr.loc[y, "max_dd"] if y in yr.index else 0 r_base = yr_base.loc[y, "return"] if y in yr_base.index else 0 r_bench = yr_bench.loc[y, "return"] if bench_prices is not None and y in yr_bench.index else 0 alpha = r_new - r_bench total_new *= (1 + r_new) total_base *= (1 + r_base) total_bench *= (1 + r_bench) print(f" {y:<6} | {r_new:>+14.2%} (dd {dd_new:>+7.2%}) | {r_base:>+25.2%} | {r_bench:>+12.2%} | {alpha:>+14.2%}") total_r_new = total_new - 1 total_r_base = total_base - 1 total_r_bench = total_bench - 1 cagr_new = (total_new ** (1 / n_years)) - 1 cagr_base = (total_base ** (1 / n_years)) - 1 cagr_bench = (total_bench ** (1 / n_years)) - 1 print(f" {'-'*6}-+-{'-'*30}-+-{'-'*25}-+-{'-'*12}-+-{'-'*14}") print(f" {'Total':<6} | {total_r_new:>+14.2%}{' '*16} | {total_r_base:>+25.2%} | {total_r_bench:>+12.2%} |") print(f" {'CAGR':<6} | {cagr_new:>+14.2%}{' '*16} | {cagr_base:>+25.2%} | {cagr_bench:>+12.2%} |") # Full period metrics print(f"\n Full metrics ({label}):") daily_ret = eq_norm.pct_change().dropna() sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) if daily_ret.std() > 0 else 0 running_max = eq_norm.cummax() max_dd = ((eq_norm - running_max) / running_max).min() print(f" Sharpe: {sharpe:.2f} | Max Drawdown: {max_dd:.2%} | Win Rate: {(daily_ret > 0).mean():.2%}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--market", default="us", choices=["us", "cn"]) args = parser.parse_args() run(args.market, years_list=[3, 5, 10]) if __name__ == "__main__": main()