The existing framework fetches today's S&P 500 constituents from Wikipedia
and applies that list to the entire 10-year price history — classic
survivorship bias. Stocks that went bankrupt or were removed for poor
performance are absent, while today's winners (which may have been minor
names 10 years ago) are implicitly selected. This materially inflates
reported strategy returns.
New pipeline:
- universe_history.py reconstructs per-ticker membership intervals by
walking Wikipedia's "Selected changes" table backward from today.
- research/fetch_historical.py downloads prices for all 848 tickers
that were ever members (Yahoo returns ~675 of them; ~170 fully
delisted names are unavailable — remaining partial bias).
- research/pit_backtest.py masks prices to NaN outside membership
windows so strategies naturally cannot select non-members.
- research/strategies_plus.py adds RecoveryMomentumPlus (generalized
Recovery+Momentum with configurable weighting / blend / regime hook)
and an EnsembleStrategy.
- research/optimize.py runs five experiments: bias drift, hyperparameter
sweep (2016-2022 train / 2023-2026 test), SPY MA regime filter,
weighting schemes, and an uncorrelated-config ensemble.
Headline finding: the biased backtest reports 40.9% CAGR for
recovery_mom_top10 over 2016-2026; the point-in-time version reports
22.4% (vs 14.0% SPY buy-and-hold). True edge is ~8pp CAGR, not ~27pp.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
126 lines
4.4 KiB
Python
126 lines
4.4 KiB
Python
"""
|
|
Point-in-time backtest runner.
|
|
|
|
Key idea: mask price data to NaN outside S&P 500 membership windows before
|
|
passing to the strategy. The strategy's signal computations then naturally
|
|
exclude non-members — no refactoring of strategies required.
|
|
|
|
Caveat: a stock joining the index has no signal for ~252 days after joining
|
|
(rolling windows need non-NaN warm-up). This is conservative but unbiased.
|
|
"""
|
|
|
|
import os
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
import metrics
|
|
import universe_history as uh
|
|
|
|
DATA_DIR = "data"
|
|
PIT_CSV = os.path.join(DATA_DIR, "us_pit.csv")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data loading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_pit_prices() -> pd.DataFrame:
|
|
"""Load the full historical S&P 500 price matrix (delisted included)."""
|
|
if not os.path.exists(PIT_CSV):
|
|
raise FileNotFoundError(
|
|
f"{PIT_CSV} not found. Run `uv run python -m research.fetch_historical` first."
|
|
)
|
|
df = pd.read_csv(PIT_CSV, index_col=0, parse_dates=True)
|
|
return df.sort_index()
|
|
|
|
|
|
def pit_universe(prices: pd.DataFrame) -> pd.DataFrame:
|
|
"""Return prices masked to S&P 500 membership at each date (NaN outside)."""
|
|
intervals = uh.load_sp500_history()
|
|
return uh.mask_prices(prices, intervals)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backtest engine (mirrors main.backtest but accepts masked prices)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def backtest(
|
|
strategy,
|
|
prices: pd.DataFrame,
|
|
initial_capital: float = 10_000,
|
|
transaction_cost: float = 0.001,
|
|
fixed_fee: float = 0.0,
|
|
benchmark: pd.Series | None = None,
|
|
regime_filter: pd.Series | None = None,
|
|
) -> pd.Series:
|
|
"""
|
|
Vectorized backtest with optional regime filter.
|
|
|
|
`regime_filter`: boolean series aligned to prices.index. True → be in the
|
|
market (use strategy weights). False → go to cash. When None, always invested.
|
|
"""
|
|
weights = strategy.generate_signals(prices)
|
|
weights = weights.reindex(prices.index).fillna(0.0)
|
|
|
|
if regime_filter is not None:
|
|
rf = regime_filter.reindex(prices.index).fillna(False).astype(float)
|
|
weights = weights.mul(rf, axis=0)
|
|
|
|
daily_returns = prices.pct_change().fillna(0.0)
|
|
portfolio_returns = (daily_returns * weights).sum(axis=1)
|
|
|
|
turnover = weights.diff().abs().sum(axis=1).fillna(0.0)
|
|
portfolio_returns -= turnover * transaction_cost
|
|
|
|
if fixed_fee > 0:
|
|
weight_changes = weights.diff().fillna(0.0)
|
|
n_trades = (weight_changes.abs() > 1e-8).sum(axis=1)
|
|
equity_running = (1 + portfolio_returns).cumprod() * initial_capital
|
|
fee_impact = (n_trades * fixed_fee) / equity_running.shift(1).fillna(initial_capital)
|
|
portfolio_returns -= fee_impact
|
|
|
|
equity = (1 + portfolio_returns).cumprod() * initial_capital
|
|
return equity
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Metrics helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def summarize(equity: pd.Series, name: str = "") -> dict:
|
|
"""Return a dict of key performance metrics (no printing)."""
|
|
eq = equity.dropna()
|
|
if len(eq) < 2:
|
|
return {"name": name, "error": "insufficient data"}
|
|
daily = eq.pct_change().dropna()
|
|
total_return = eq.iloc[-1] / eq.iloc[0] - 1
|
|
years = (eq.index[-1] - eq.index[0]).days / 365.25
|
|
cagr = (eq.iloc[-1] / eq.iloc[0]) ** (1 / years) - 1 if years > 0 else 0.0
|
|
vol = daily.std() * np.sqrt(252)
|
|
sharpe = (daily.mean() * 252) / vol if vol > 0 else 0.0
|
|
downside = daily[daily < 0].std() * np.sqrt(252)
|
|
sortino = (daily.mean() * 252) / downside if downside > 0 else 0.0
|
|
dd = (eq / eq.cummax() - 1).min()
|
|
calmar = cagr / abs(dd) if dd < 0 else 0.0
|
|
return {
|
|
"name": name,
|
|
"CAGR": cagr,
|
|
"Sharpe": sharpe,
|
|
"Sortino": sortino,
|
|
"MaxDD": dd,
|
|
"Calmar": calmar,
|
|
"TotalRet": total_return,
|
|
"Vol": vol,
|
|
}
|
|
|
|
|
|
def fmt_row(r: dict) -> str:
|
|
return (f" {r['name']:<38s} "
|
|
f"CAGR={r['CAGR']*100:>6.1f}% "
|
|
f"Sharpe={r['Sharpe']:>5.2f} "
|
|
f"Sortino={r['Sortino']:>5.2f} "
|
|
f"MaxDD={r['MaxDD']*100:>6.1f}% "
|
|
f"Calmar={r['Calmar']:>5.2f} "
|
|
f"Total={r['TotalRet']*100:>7.1f}%")
|