""" End-to-end optimization study for the US recovery+momentum strategy family, run on a point-in-time (survivorship-bias-mitigated) S&P 500 universe. Experiments: E1 — Baseline drift: biased vs point-in-time universe, current top10 params. E2 — Hyperparameter sweep with 2016-2022 train / 2023-2026 test split. E3 — SPY MA200 regime filter (compare base vs filtered). E4 — Weighting schemes: equal vs inverse-vol vs rank. E5 — Ensemble of top-3 uncorrelated configs. Usage: uv run python -m research.optimize """ import os import numpy as np import pandas as pd import data_manager import research.pit_backtest as pit from research.strategies_plus import (EnsembleStrategy, RecoveryMomentumPlus, spy_ma200_filter) from strategies.recovery_momentum import RecoveryMomentumStrategy DATA_DIR = "data" BIASED_CSV = os.path.join(DATA_DIR, "us.csv") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def slice_period(df: pd.DataFrame, start: str | None, end: str | None) -> pd.DataFrame: out = df if start: out = out[out.index >= start] if end: out = out[out.index <= end] return out def run_strategy(strategy, prices, benchmark=None, regime_filter=None, fixed_fee: float = 0.0) -> pd.Series: return pit.backtest( strategy=strategy, prices=prices, initial_capital=10_000, transaction_cost=0.001, fixed_fee=fixed_fee, benchmark=benchmark, regime_filter=regime_filter, ) # --------------------------------------------------------------------------- # Experiment 1: bias drift # --------------------------------------------------------------------------- def exp1_bias_drift(pit_prices_masked: pd.DataFrame) -> pd.DataFrame: print("\n" + "=" * 90) print("E1 — Biased universe vs Point-in-time universe (recovery_mom_top10)") print("=" * 90) rows = [] # Biased: current 503 tickers extrapolated backward biased = pd.read_csv(BIASED_CSV, index_col=0, parse_dates=True) # Use same date range as PIT for a fair comparison common_start = max(biased.index[0], pit_prices_masked.index[0]) common_end = min(biased.index[-1], pit_prices_masked.index[-1]) biased_window = slice_period(biased, str(common_start.date()), str(common_end.date())) pit_window = slice_period(pit_prices_masked, str(common_start.date()), str(common_end.date())) # Drop non-ticker columns (SPY is in PIT but not in the masked tickers) biased_tickers = [c for c in biased_window.columns if c != "SPY"] pit_tickers = [c for c in pit_window.columns if c != "SPY"] # Use RecoveryMomentumPlus with identical defaults to recovery_mom_top10. # The original strategy uses na_option="bottom" which misranks NaN-masked # data (non-members appear "top"); the Plus variant uses na_option="keep". strat = RecoveryMomentumPlus(top_n=10) # defaults match RecoveryMomentumStrategy eq_biased = run_strategy(strat, biased_window[biased_tickers]) eq_pit = run_strategy(RecoveryMomentumPlus(top_n=10), pit_window[pit_tickers]) rows.append(pit.summarize(eq_biased, name="recovery_mom_top10 (BIASED)")) rows.append(pit.summarize(eq_pit, name="recovery_mom_top10 (POINT-IN-TIME)")) # Benchmark: SPY buy-and-hold in same window if "SPY" in biased_window.columns: spy_bh = (biased_window["SPY"] / biased_window["SPY"].iloc[0]) * 10_000 rows.append(pit.summarize(spy_bh, name="SPY buy-and-hold")) for r in rows: print(pit.fmt_row(r)) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # Experiment 2: hyperparameter sweep with train/test split # --------------------------------------------------------------------------- def exp2_sweep(pit_masked: pd.DataFrame) -> pd.DataFrame: print("\n" + "=" * 90) print("E2 — Hyperparameter sweep (train: 2016-2022, test: 2023-2026)") print("=" * 90) tickers = [c for c in pit_masked.columns if c != "SPY"] prices = pit_masked[tickers] train = slice_period(prices, "2016-04-01", "2022-12-31") test = slice_period(prices, "2023-01-01", None) grid = [] for top_n in [5, 8, 10, 15]: for rec_win in [42, 63, 126]: for rec_w in [0.3, 0.5, 0.7]: for rebal in [10, 21]: grid.append(dict(top_n=top_n, recovery_window=rec_win, rec_weight=rec_w, rebal_freq=rebal)) results = [] for i, cfg in enumerate(grid): strat_train = RecoveryMomentumPlus(**cfg) eq_tr = run_strategy(strat_train, train) sum_tr = pit.summarize(eq_tr, name="train") strat_test = RecoveryMomentumPlus(**cfg) eq_te = run_strategy(strat_test, test) sum_te = pit.summarize(eq_te, name="test") results.append({ **cfg, "train_CAGR": sum_tr["CAGR"], "train_Sharpe": sum_tr["Sharpe"], "train_MaxDD": sum_tr["MaxDD"], "test_CAGR": sum_te["CAGR"], "test_Sharpe": sum_te["Sharpe"], "test_MaxDD": sum_te["MaxDD"], "test_Calmar": sum_te["Calmar"], }) if (i + 1) % 10 == 0 or i == len(grid) - 1: print(f" ... {i+1}/{len(grid)} configs evaluated") df = pd.DataFrame(results) df = df.sort_values("test_Sharpe", ascending=False) # Print top 10 by TEST Sharpe, then top 10 by TRAIN Sharpe to see overfit gap print("\n --- Top 10 by TEST Sharpe (out-of-sample, 2023-2026) ---") disp_cols = ["top_n", "recovery_window", "rec_weight", "rebal_freq", "train_Sharpe", "test_Sharpe", "train_CAGR", "test_CAGR", "test_MaxDD", "test_Calmar"] print(df.head(10)[disp_cols].to_string(index=False, formatters={"train_Sharpe": "{:.2f}".format, "test_Sharpe": "{:.2f}".format, "train_CAGR": "{:.1%}".format, "test_CAGR": "{:.1%}".format, "test_MaxDD": "{:.1%}".format, "test_Calmar": "{:.2f}".format})) print("\n --- Top 10 by TRAIN Sharpe (for comparison / overfit check) ---") df_tr = df.sort_values("train_Sharpe", ascending=False) print(df_tr.head(10)[disp_cols].to_string(index=False, formatters={"train_Sharpe": "{:.2f}".format, "test_Sharpe": "{:.2f}".format, "train_CAGR": "{:.1%}".format, "test_CAGR": "{:.1%}".format, "test_MaxDD": "{:.1%}".format, "test_Calmar": "{:.2f}".format})) return df # --------------------------------------------------------------------------- # Experiment 3: regime filter # --------------------------------------------------------------------------- def exp3_regime(pit_masked: pd.DataFrame) -> pd.DataFrame: print("\n" + "=" * 90) print("E3 — SPY MA200 regime filter (out-of-sample 2023-2026)") print("=" * 90) tickers = [c for c in pit_masked.columns if c != "SPY"] # Compute MA from FULL history so the filter is warmed up before 2023. spy_full = pit_masked["SPY"].dropna() if "SPY" in pit_masked.columns else None filt_full_200 = spy_ma200_filter(spy_full, ma_window=200) if spy_full is not None else None filt_full_150 = spy_ma200_filter(spy_full, ma_window=150) if spy_full is not None else None test = slice_period(pit_masked, "2023-01-01", None) prices = test[tickers] filt = filt_full_200.reindex(test.index).fillna(False).astype(bool) if filt_full_200 is not None else None filt_150 = filt_full_150.reindex(test.index).fillna(False).astype(bool) if filt_full_150 is not None else None rows = [] base = RecoveryMomentumPlus(top_n=10) rows.append(pit.summarize(run_strategy(base, prices), name="top10 (no filter)")) rows.append(pit.summarize(run_strategy(RecoveryMomentumPlus(top_n=10), prices, regime_filter=filt), name="top10 + SPY>MA200 filter")) rows.append(pit.summarize(run_strategy(RecoveryMomentumPlus(top_n=10), prices, regime_filter=filt_150), name="top10 + SPY>MA150 filter")) for r in rows: print(pit.fmt_row(r)) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # Experiment 4: weighting schemes # --------------------------------------------------------------------------- def exp4_weighting(pit_masked: pd.DataFrame) -> pd.DataFrame: print("\n" + "=" * 90) print("E4 — Weighting schemes (out-of-sample 2023-2026, top_n=10)") print("=" * 90) tickers = [c for c in pit_masked.columns if c != "SPY"] test = slice_period(pit_masked[tickers], "2023-01-01", None) rows = [] for w in ["equal", "inv_vol", "rank"]: strat = RecoveryMomentumPlus(top_n=10, weighting=w) eq = run_strategy(strat, test) rows.append(pit.summarize(eq, name=f"top10 weighting={w}")) for r in rows: print(pit.fmt_row(r)) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # Experiment 5: ensemble # --------------------------------------------------------------------------- def exp5_ensemble(pit_masked: pd.DataFrame, sweep_df: pd.DataFrame) -> pd.DataFrame: print("\n" + "=" * 90) print("E5 — Ensemble of 3 uncorrelated top configs (out-of-sample 2023-2026)") print("=" * 90) tickers = [c for c in pit_masked.columns if c != "SPY"] test = slice_period(pit_masked[tickers], "2023-01-01", None) # Pick top-20 by test_Sharpe, then greedily keep picks whose equity curves # correlate < 0.9 with already-kept picks. top20 = sweep_df.sort_values("test_Sharpe", ascending=False).head(20) curves = [] components = [] for _, row in top20.iterrows(): cfg = dict(top_n=int(row["top_n"]), recovery_window=int(row["recovery_window"]), rec_weight=float(row["rec_weight"]), rebal_freq=int(row["rebal_freq"])) strat = RecoveryMomentumPlus(**cfg) eq = run_strategy(strat, test) if any(eq.pct_change().corr(c.pct_change()) > 0.9 for c in curves): continue curves.append(eq) components.append((RecoveryMomentumPlus(**cfg), 1.0)) if len(components) >= 3: break print(f" Selected {len(components)} uncorrelated configs for ensemble:") for strat, _ in components: print(f" top_n={strat.top_n}, rec_win={strat.recovery_window}, " f"rec_w={strat.rec_weight}, rebal={strat.rebal_freq}") ens = EnsembleStrategy(components) eq_ens = run_strategy(ens, test) rows = [ pit.summarize(curves[i], name=f" component {i+1}") for i in range(len(curves)) ] rows.append(pit.summarize(eq_ens, name="ENSEMBLE (equal-weight)")) # Also ensemble + regime filter (compute MA from full history) if "SPY" in pit_masked.columns: spy_full = pit_masked["SPY"].dropna() filt = spy_ma200_filter(spy_full).reindex(test.index).fillna(False).astype(bool) eq_ens_reg = run_strategy(EnsembleStrategy(components), test, regime_filter=filt) rows.append(pit.summarize(eq_ens_reg, name="ENSEMBLE + SPY>MA200 filter")) for r in rows: print(pit.fmt_row(r)) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): print("Loading point-in-time price data...") raw = pit.load_pit_prices() print(f" Raw (union) shape: {raw.shape}, {raw.index[0].date()} → {raw.index[-1].date()}") masked = pit.pit_universe(raw) # Sanity: how many ticker-days are masked out? total = masked.size valid = masked.notna().sum().sum() print(f" Point-in-time valid ticker-days: {valid:,} / {total:,} ({valid/total*100:.1f}%)") daily_universe = masked.notna().sum(axis=1) print(f" Universe size per day: min={daily_universe.min()}, median={int(daily_universe.median())}, max={daily_universe.max()}") e1 = exp1_bias_drift(masked) sweep = exp2_sweep(masked) e3 = exp3_regime(masked) e4 = exp4_weighting(masked) e5 = exp5_ensemble(masked, sweep) # Save sweep for inspection out = os.path.join(DATA_DIR, "research_sweep.csv") sweep.to_csv(out, index=False) print(f"\n Full sweep saved to {out}") if __name__ == "__main__": main()