""" Factor research v2 — Portfolio-first approach. Instead of IC → portfolio, we go directly to: 1. Build factor signal 2. Select top-N stocks 3. Run real backtest with transaction costs 4. Measure CAGR, Sharpe, MaxDD, yearly returns Tests single factors AND combinations. Compares everything against the baseline recovery+momentum strategy. """ from __future__ import annotations import argparse import warnings import numpy as np import pandas as pd import data_manager import metrics from universe import UNIVERSES warnings.filterwarnings("ignore") # --------------------------------------------------------------------------- # Factor signals — each returns DataFrame (dates x stocks), higher = better # --------------------------------------------------------------------------- def f_momentum_12_1(p: pd.DataFrame) -> pd.DataFrame: return p.shift(21).pct_change(231) def f_recovery(p: pd.DataFrame) -> pd.DataFrame: return p / p.rolling(63, min_periods=63).min() - 1 def f_recovery_mom(p: pd.DataFrame) -> pd.DataFrame: """The baseline composite: 50/50 recovery + momentum ranks.""" r1 = f_recovery(p).rank(axis=1, pct=True, na_option="keep") r2 = f_momentum_12_1(p).rank(axis=1, pct=True, na_option="keep") return 0.5 * r1 + 0.5 * r2 # --- New single factors --- def f_short_term_reversal(p: pd.DataFrame) -> pd.DataFrame: """5-day return reversal.""" return -p.pct_change(5) def f_vol_adjusted_mom(p: pd.DataFrame) -> pd.DataFrame: """Momentum divided by recent volatility. Sharpe-like signal. Hypothesis: risk-adjusted momentum is more persistent.""" mom = p.shift(21).pct_change(231) vol = p.pct_change().rolling(60, min_periods=40).std() return mom / vol.replace(0, np.nan) def f_acceleration(p: pd.DataFrame) -> pd.DataFrame: """3-month momentum minus 12-month momentum. Hypothesis: accelerating stocks continue accelerating.""" mom_3m = p.shift(5).pct_change(63 - 5) mom_12m = p.shift(21).pct_change(231) return mom_3m - mom_12m def f_breakout(p: pd.DataFrame) -> pd.DataFrame: """Price relative to 20-day high. Close to 1 = breaking out. Hypothesis: breakouts from consolidation continue.""" return p / p.rolling(20, min_periods=20).max() def f_recovery_deep(p: pd.DataFrame) -> pd.DataFrame: """Recovery from 126-day (6 month) low instead of 63-day. Hypothesis: deeper recovery = stronger signal.""" return p / p.rolling(126, min_periods=126).min() - 1 def f_recovery_rate(p: pd.DataFrame) -> pd.DataFrame: """Speed of recovery: 20-day change in recovery factor. Hypothesis: accelerating recovery predicts continuation.""" recovery = p / p.rolling(63, min_periods=63).min() - 1 return recovery - recovery.shift(20) def f_drawdown_bounce(p: pd.DataFrame) -> pd.DataFrame: """20-day return from drawdown trough, only for stocks in drawdown. Hypothesis: strong bounces from drawdowns persist.""" rolling_max = p.rolling(252, min_periods=126).max() in_drawdown = p < rolling_max * 0.9 # at least 10% below peak bounce_20d = p.pct_change(20) # Only score stocks that were recently in drawdown was_in_drawdown = in_drawdown.rolling(20, min_periods=1).max().astype(bool) return bounce_20d.where(was_in_drawdown, np.nan) def f_consistent_winner(p: pd.DataFrame) -> pd.DataFrame: """Fraction of months with positive returns over past 12 months. Hypothesis: stocks that win consistently are higher quality momentum.""" monthly_ret = p.pct_change(21) return (monthly_ret > 0).astype(float).rolling(252, min_periods=126).mean() def f_gap_up_freq(p: pd.DataFrame) -> pd.DataFrame: """Fraction of days with >1% gain in past 60 days. Hypothesis: frequent large gains = institutional buying.""" ret = p.pct_change() return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean() def f_low_vol_mom(p: pd.DataFrame) -> pd.DataFrame: """Momentum only among low-volatility stocks. Combined rank. Hypothesis: low-vol momentum is more persistent.""" mom = f_momentum_12_1(p).rank(axis=1, pct=True, na_option="keep") vol = (-p.pct_change().rolling(60, min_periods=40).std()).rank(axis=1, pct=True, na_option="keep") return 0.5 * mom + 0.5 * vol def f_52w_channel_position(p: pd.DataFrame) -> pd.DataFrame: """Position within 252-day high-low channel. 1 = at high, 0 = at low. Hypothesis: stocks near highs continue (anchoring + trend).""" h = p.rolling(252, min_periods=126).max() l = p.rolling(252, min_periods=126).min() return (p - l) / (h - l).replace(0, np.nan) def f_up_volume_proxy(p: pd.DataFrame) -> pd.DataFrame: """Proxy for accumulation: sum of returns on up days over 20 days. Without volume data, use magnitude of positive returns as proxy.""" ret = p.pct_change() up_ret = ret.where(ret > 0, 0) return up_ret.rolling(20, min_periods=15).sum() def f_relative_strength_ma(p: pd.DataFrame) -> pd.DataFrame: """Price above 50-day MA relative to 200-day MA position. Dual MA trend strength.""" ma50 = p.rolling(50, min_periods=50).mean() ma200 = p.rolling(200, min_periods=200).mean() above_50 = (p / ma50 - 1) above_200 = (p / ma200 - 1) return 0.5 * above_50 + 0.5 * above_200 def f_earnings_drift_proxy(p: pd.DataFrame) -> pd.DataFrame: """Proxy for post-earnings drift using 5-day return spikes. Identify large 5-day moves and bet on continuation. Hypothesis: large moves driven by information continue.""" ret_5d = p.pct_change(5) vol = p.pct_change().rolling(60, min_periods=40).std() * np.sqrt(5) z_score = ret_5d / vol.replace(0, np.nan) # Smooth: average z-score over past 60 days to capture multiple events return z_score.rolling(60, min_periods=20).mean() # --- A-share specific --- def f_reversal_vol_cn(p: pd.DataFrame) -> pd.DataFrame: """Short-term reversal amplified by volatility. High-vol oversold stocks bounce harder in A-shares.""" ret_5d = p.pct_change(5) vol = p.pct_change().rolling(20, min_periods=15).std() # Oversold (negative return) + high vol = positive score return -ret_5d * vol def f_momentum_6_1(p: pd.DataFrame) -> pd.DataFrame: """6-1 month momentum. Shorter lookback may work better in A-shares.""" return p.shift(21).pct_change(105) def f_recovery_narrow(p: pd.DataFrame) -> pd.DataFrame: """Recovery from 21-day low. Faster recovery signal for A-shares.""" return p / p.rolling(21, min_periods=21).min() - 1 def f_range_breakout_cn(p: pd.DataFrame) -> pd.DataFrame: """Breakout from 60-day range. Tuned for A-share volatility.""" h60 = p.rolling(60, min_periods=40).max() l60 = p.rolling(60, min_periods=40).min() mid = (h60 + l60) / 2 rng = (h60 - l60) / mid.replace(0, np.nan) position = (p - l60) / (h60 - l60).replace(0, np.nan) # Reward stocks breaking out of narrow ranges return position / rng.replace(0, np.nan) # --------------------------------------------------------------------------- # Strategy builder and backtester # --------------------------------------------------------------------------- def make_strategy( prices: pd.DataFrame, signal_func, top_n: int = 10, rebal_freq: int = 21, warmup: int = 252, ) -> pd.DataFrame: """Turn a factor signal into a rebalanced top-N equal-weight strategy.""" signal = signal_func(prices) rank = signal.rank(axis=1, ascending=False, na_option="bottom") n_valid = signal.notna().sum(axis=1) enough = n_valid >= top_n top_mask = (rank <= top_n) & enough.values.reshape(-1, 1) raw = top_mask.astype(float) row_sums = raw.sum(axis=1).replace(0, np.nan) weights = raw.div(row_sums, axis=0).fillna(0.0) # Monthly rebalance rebal_mask = pd.Series(False, index=prices.index) rebal_indices = list(range(warmup, len(prices), rebal_freq)) rebal_mask.iloc[rebal_indices] = True weights[~rebal_mask] = np.nan weights = weights.ffill().fillna(0.0) weights.iloc[:warmup] = 0.0 return weights.shift(1).fillna(0.0) def combo_signal(funcs_and_weights: list[tuple]) -> callable: """Create a combined signal function from [(func, weight), ...].""" def _combo(p: pd.DataFrame) -> pd.DataFrame: ranked = [] for func, w in funcs_and_weights: sig = func(p) ranked.append(w * sig.rank(axis=1, pct=True, na_option="keep")) return sum(ranked) return _combo def run_backtest( weights: pd.DataFrame, prices: pd.DataFrame, cost: float = 0.001, ) -> pd.Series: """Vectorized backtest returning equity curve.""" returns = prices.pct_change().fillna(0.0) port_ret = (weights * returns).sum(axis=1) turnover = weights.diff().abs().sum(axis=1) port_ret -= turnover * cost return (1 + port_ret).cumprod() * 100000 def compute_stats(equity: pd.Series, label: str) -> dict: """Compute strategy statistics.""" daily_ret = equity.pct_change().dropna() if len(daily_ret) < 100 or daily_ret.std() == 0: return {"name": label, "cagr": np.nan, "sharpe": np.nan, "maxdd": np.nan, "total": np.nan, "win_rate": np.nan} n_years = len(daily_ret) / 252 total_ret = equity.iloc[-1] / equity.iloc[0] - 1 cagr = (1 + total_ret) ** (1 / n_years) - 1 sharpe = daily_ret.mean() / daily_ret.std() * np.sqrt(252) sortino_denom = daily_ret[daily_ret < 0].std() sortino = daily_ret.mean() / sortino_denom * np.sqrt(252) if sortino_denom > 0 else 0 running_max = equity.cummax() maxdd = ((equity - running_max) / running_max).min() calmar = cagr / abs(maxdd) if maxdd != 0 else 0 win_rate = (daily_ret > 0).mean() return { "name": label, "cagr": cagr, "sharpe": sharpe, "sortino": sortino, "maxdd": maxdd, "calmar": calmar, "total": total_ret, "win_rate": win_rate, } def yearly_returns(equity: pd.Series) -> dict[int, float]: daily_ret = equity.pct_change().fillna(0) years = daily_ret.index.year result = {} for year in sorted(years.unique()): mask = years == year result[year] = float((1 + daily_ret[mask]).prod() - 1) return result def run(market: str): config = UNIVERSES[market] benchmark = config["benchmark"] print(f"Loading {market.upper()} price data...") prices = data_manager.load(market) bench = prices[benchmark].dropna() if benchmark in prices.columns else None stocks = prices.drop(columns=[benchmark], errors="ignore") print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} days") print(f"Period: {stocks.index[0].date()} to {stocks.index[-1].date()}\n") # --- Define all strategies to test --- strategies: list[tuple[str, callable]] = [] # Baseline strategies.append(("BASELINE: recovery+mom", f_recovery_mom)) # Single factors strategies.append(("momentum_12_1", f_momentum_12_1)) strategies.append(("recovery", f_recovery)) strategies.append(("vol_adj_momentum", f_vol_adjusted_mom)) strategies.append(("acceleration", f_acceleration)) strategies.append(("breakout_20d", f_breakout)) strategies.append(("recovery_deep_126d", f_recovery_deep)) strategies.append(("recovery_rate", f_recovery_rate)) strategies.append(("drawdown_bounce", f_drawdown_bounce)) strategies.append(("consistent_winner", f_consistent_winner)) strategies.append(("gap_up_freq", f_gap_up_freq)) strategies.append(("low_vol_momentum", f_low_vol_mom)) strategies.append(("52w_channel_position", f_52w_channel_position)) strategies.append(("up_volume_proxy", f_up_volume_proxy)) strategies.append(("relative_strength_ma", f_relative_strength_ma)) strategies.append(("earnings_drift_proxy", f_earnings_drift_proxy)) if market == "cn": strategies.append(("reversal_vol_cn", f_reversal_vol_cn)) strategies.append(("momentum_6_1", f_momentum_6_1)) strategies.append(("recovery_narrow_21d", f_recovery_narrow)) strategies.append(("range_breakout_cn", f_range_breakout_cn)) # Run all single-factor backtests print("=" * 110) print(f" SINGLE FACTOR BACKTESTS — {market.upper()} (Top 10, monthly rebal, 10bps cost)") print("=" * 110) results = [] equities = {} for name, func in strategies: print(f" Running: {name}...") w = make_strategy(stocks, func, top_n=10) eq = run_backtest(w, stocks) equities[name] = eq results.append(compute_stats(eq, name)) # Benchmark if bench is not None: eq_bench = bench / bench.iloc[0] * 100000 equities["BENCHMARK"] = eq_bench results.append(compute_stats(eq_bench, "BENCHMARK")) # Print results table df = pd.DataFrame(results).set_index("name") df = df.sort_values("cagr", ascending=False) print(f"\n{'Strategy':<30} {'CAGR':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD':>8} {'Calmar':>8} {'Total':>10}") print("-" * 90) for name, row in df.iterrows(): flag = " ***" if name == "BASELINE: recovery+mom" else "" print(f"{name:<30} {row['cagr']:>+7.1%} {row['sharpe']:>8.2f} {row['sortino']:>8.2f} " f"{row['maxdd']:>+7.1%} {row['calmar']:>8.2f} {row['total']:>+9.0%}{flag}") # --- Identify factors that beat or match baseline --- baseline_cagr = df.loc["BASELINE: recovery+mom", "cagr"] winners = df[df["cagr"] >= baseline_cagr * 0.8].index.tolist() winners = [w for w in winners if w not in ("BASELINE: recovery+mom", "BENCHMARK")] print(f"\nFactors within 80% of baseline CAGR ({baseline_cagr:.1%}): {winners}") # --- Test combinations of top performers --- print(f"\n{'='*110}") print(f" FACTOR COMBINATIONS — {market.upper()}") print(f"{'='*110}") # Get top single factors single_only = df.drop(["BASELINE: recovery+mom", "BENCHMARK"], errors="ignore") top_singles = single_only.nlargest(8, "cagr").index.tolist() print(f" Top 8 singles: {top_singles}\n") # Map names back to functions func_map = dict(strategies) combos: list[tuple[str, callable]] = [] # Baseline is always included combos.append(("BASELINE: recovery+mom", f_recovery_mom)) # Top2 combinations for i in range(min(6, len(top_singles))): for j in range(i + 1, min(6, len(top_singles))): n1, n2 = top_singles[i], top_singles[j] label = f"{n1} + {n2}" func = combo_signal([(func_map[n1], 0.5), (func_map[n2], 0.5)]) combos.append((label, func)) # Recovery+mom + each top single (3-factor) for name in top_singles[:6]: if name in ("momentum_12_1", "recovery"): continue label = f"rec+mom + {name}" func = combo_signal([ (f_recovery, 0.33), (f_momentum_12_1, 0.33), (func_map[name], 0.34) ]) combos.append((label, func)) # Run combo backtests combo_results = [] for name, func in combos: print(f" Running: {name}...") w = make_strategy(stocks, func, top_n=10) eq = run_backtest(w, stocks) equities[name] = eq combo_results.append(compute_stats(eq, name)) combo_df = pd.DataFrame(combo_results).set_index("name") combo_df = combo_df.sort_values("cagr", ascending=False) print(f"\n{'Combo':<55} {'CAGR':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD':>8} {'Calmar':>8}") print("-" * 105) for name, row in combo_df.iterrows(): flag = " ***" if name == "BASELINE: recovery+mom" else "" print(f"{name:<55} {row['cagr']:>+7.1%} {row['sharpe']:>8.2f} {row['sortino']:>8.2f} " f"{row['maxdd']:>+7.1%} {row['calmar']:>8.2f}{flag}") # --- Yearly breakdown for top 3 combos --- top3 = combo_df.nlargest(3, "cagr").index.tolist() if "BASELINE: recovery+mom" not in top3: top3.append("BASELINE: recovery+mom") print(f"\n{'='*110}") print(f" YEARLY RETURNS — TOP STRATEGIES vs BASELINE — {market.upper()}") print(f"{'='*110}") yr_data = {} for name in top3: yr_data[name] = yearly_returns(equities[name]) if bench is not None: yr_data["BENCHMARK"] = yearly_returns(equities["BENCHMARK"]) all_years = sorted(set(y for yd in yr_data.values() for y in yd.keys())) # Print header col_names = top3 + (["BENCHMARK"] if bench is not None else []) header = f" {'Year':<6}" for c in col_names: header += f" | {c[:25]:>25}" print(header) print(" " + "-" * (6 + 28 * len(col_names))) for year in all_years: line = f" {year:<6}" for c in col_names: r = yr_data.get(c, {}).get(year, 0) line += f" | {r:>+24.1%}" print(line) # Compute period summaries for n_years in [3, 5, 10]: cutoff = stocks.index[-1] - pd.DateOffset(years=n_years) print(f"\n --- {n_years}-year CAGR ---") for name in col_names: eq = equities.get(name) if eq is None: continue eq_slice = eq[eq.index >= cutoff] if len(eq_slice) < 50: continue total = eq_slice.iloc[-1] / eq_slice.iloc[0] - 1 cagr = (1 + total) ** (1 / n_years) - 1 print(f" {name[:40]:<40} {cagr:>+8.1%}") def main(): parser = argparse.ArgumentParser() parser.add_argument("--market", default="us", choices=["us", "cn"]) args = parser.parse_args() run(args.market) if __name__ == "__main__": main()