research: add US alpha exploration scripts

This commit is contained in:
2026-04-18 16:14:27 +08:00
parent 40ec3b828a
commit d0e8c97695
6 changed files with 1445 additions and 0 deletions

358
research/alpha_factors.py Normal file
View File

@@ -0,0 +1,358 @@
"""
Alpha factor library — price-only, academically motivated, with a rolling-IC
combiner, inverse-vol portfolio weighting, and volatility targeting.
Factors (each returns a cross-sectional DataFrame aligned to prices.index):
mom_12_1 12-1 month momentum (Jegadeesh & Titman 1993).
mom_7_1 Intermediate 7-1m momentum (Novy-Marx 2012).
mom_residual Market-residualized 12-1m (Blitz-Huij-Martens 2011).
rev_1m 1-month reversal × -1 (Jegadeesh 1990 / short-term reversal).
w52_high Price / 52-week high, proximity factor (George & Hwang 2004).
max5_neg -avg(top-5 daily returns past 21d) — lottery/MAX (Bali-Cakici-Whitelaw 2011).
idio_vol_neg -residual-vol from 60d market regression (Ang-Hodrick-Xing-Zhang 2006).
low_beta -60d market beta (Betting Against Beta, Frazzini-Pedersen 2014 variant).
trend_strength Slope / RMSE from 63d log-price regression.
recovery_63 Price / 63d low - 1 (project-native, V-rebound proxy).
Combiner:
- Cross-sectional percentile-rank each factor (NaN = keep).
- For each day, blend factors with weights proportional to the rolling
252-day Information Coefficient (Spearman rank corr vs forward 21d return).
- Weights are lagged by 21 days to avoid lookahead; negative-IC factors are
sign-flipped before weighting (so all contribute positively when confident).
Portfolio:
- Rank composite score, pick top_n (default 15) on a rebalance_freq schedule.
- Inverse-vol weight within top_n (60d realized vol).
- Volatility-target the whole portfolio to target_vol (default 18%) using a
trailing 60-day portfolio-vol estimate; exposure clipped to [0.3, 1.5].
- Shift(1) at the end for T-1 signal delivery, matching the project convention.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from strategies.base import Strategy
# ---------------------------------------------------------------------------
# Factor primitives
# ---------------------------------------------------------------------------
def _pct(p, n):
return p.pct_change(n, fill_method=None)
def f_mom_12_1(p):
return p.shift(21).pct_change(231, fill_method=None)
def f_mom_7_1(p):
return p.shift(21).pct_change(126, fill_method=None)
def f_rev_1m(p):
return -p.pct_change(21, fill_method=None)
def f_w52_high(p):
roll_max = p.rolling(252, min_periods=200).max()
return p / roll_max - 1 # ≤0, closer to 0 = near 52w high
def f_max5_neg(p):
ret = p.pct_change(fill_method=None)
# Mean of top-5 returns over the last 21 trading days; negate.
top5 = ret.rolling(21, min_periods=15).apply(
lambda x: np.mean(np.sort(x)[-5:]) if np.isfinite(x).sum() >= 5 else np.nan,
raw=True,
)
return -top5
def f_recovery_63(p):
return p / p.rolling(63, min_periods=60).min() - 1
def f_trend_strength(p):
"""
Vectorized log-price trend strength: rolling OLS slope ÷ residual RMSE on a
63-day window. t-stat-like measure of directional trend quality.
"""
logp = np.log(p.replace(0, np.nan))
n = 63
idx = np.arange(n, dtype=float)
idx_c = idx - idx.mean()
idx_var = (idx_c ** 2).sum()
# E[x·y] over the window: rolling sum of (idx·y) simplified via decomposition:
# Σ (i - ī)(y - ȳ) = Σ i·y - n·ī·ȳ (but ī is constant so just: Σ (i-ī)·y)
# We compute Σ (i-ī)·y as a rolling window-weighted sum.
weights = idx_c # shape (n,)
def rolling_weighted(series_df, w):
"""Σ_{k=0..n-1} w[k] * y[t-(n-1)+k] for each column, vectorized."""
arr = series_df.values
T, K = arr.shape
out = np.full_like(arr, np.nan, dtype=float)
# Convolution across time axis per column:
for k in range(K):
col = arr[:, k]
# Use np.convolve with reversed weights (equivalent to correlate)
conv = np.convolve(col, w[::-1], mode="valid")
out[n - 1:, k] = conv
return pd.DataFrame(out, index=series_df.index, columns=series_df.columns)
# rolling mean and var for log-price
roll_mean = logp.rolling(n, min_periods=n).mean()
# numerator: Σ (i-ī)(y - ȳ) = Σ (i-ī)·y (since Σ(i-ī) = 0)
num = rolling_weighted(logp.fillna(0.0), weights)
slope = num / idx_var
# Residual variance: Σ(y - ȳ)² / n - slope² * idx_var / n
var_y = logp.rolling(n, min_periods=n).var(ddof=0)
resid_var = (var_y - (slope ** 2) * idx_var / n).clip(lower=1e-18)
rmse = np.sqrt(resid_var)
ts = slope / rmse
# mask rows where the window contained any NaN
valid = logp.rolling(n, min_periods=n).count() == n
return ts.where(valid)
def _rolling_beta_and_residvol(p, mkt_ret, window=60):
"""Return (beta, residual_vol) DataFrames aligned to prices.index."""
ret = p.pct_change(fill_method=None)
mkt = mkt_ret.reindex(p.index)
def pair(stock_ret):
cov = stock_ret.rolling(window, min_periods=window).cov(mkt)
var = mkt.rolling(window, min_periods=window).var()
beta = cov / var
# Residual vol via: var(stock) - beta^2 * var(mkt) (simplification)
var_stock = stock_ret.rolling(window, min_periods=window).var()
resid_var = (var_stock - beta ** 2 * var) .clip(lower=0)
resid_vol = np.sqrt(resid_var)
return beta, resid_vol
betas = {}
resid_vols = {}
for col in ret.columns:
b, rv = pair(ret[col])
betas[col] = b
resid_vols[col] = rv
return pd.DataFrame(betas), pd.DataFrame(resid_vols)
def f_mom_residual(p, mkt_ret, betas=None, window=60):
if betas is None:
betas, _ = _rolling_beta_and_residvol(p, mkt_ret, window=window)
# 12-1m cumulative residual return = cum stock ret - beta * cum mkt ret.
# Reindex mkt_ret to p.index so arithmetic below does not produce a union
# index (which would corrupt downstream shape assumptions).
mkt_aligned = mkt_ret.reindex(p.index)
stock_cum = p.shift(21).pct_change(231, fill_method=None)
mkt_cum_ret = (1 + mkt_aligned).rolling(231).apply(lambda x: np.prod(x) - 1, raw=True)
mkt_cum = mkt_cum_ret.shift(21)
out = stock_cum.sub(betas.mul(mkt_cum, axis=0), fill_value=np.nan)
return out.reindex(p.index)
# ---------------------------------------------------------------------------
# Cross-sectional rank helper
# ---------------------------------------------------------------------------
def xsec_rank(df: pd.DataFrame) -> pd.DataFrame:
return df.rank(axis=1, pct=True, na_option="keep")
# ---------------------------------------------------------------------------
# Rolling IC computation
# ---------------------------------------------------------------------------
def rolling_ic(factor_rank: pd.DataFrame, fwd_ret: pd.DataFrame,
window: int = 252) -> pd.Series:
"""Daily Spearman IC = rank(factor) vs rank(fwd_ret); rolling mean."""
fr = fwd_ret.rank(axis=1, pct=True, na_option="keep")
# Per-day pearson corr of rank-transformed ≡ Spearman.
per_day_ic = factor_rank.corrwith(fr, axis=1)
return per_day_ic.rolling(window, min_periods=window // 2).mean()
def _rolling_ls_sharpe(factor_rank: pd.DataFrame,
prices: pd.DataFrame,
window: int = 252,
rebal: int = 21,
tcost: float = 0.001) -> pd.Series:
"""
Rolling realized Sharpe of a long-top-decile / short-bottom-decile portfolio
constructed on `factor_rank`, rebalanced every `rebal` trading days, with
proportional turnover cost `tcost`. Used as a factor-quality weight.
Returned series is aligned to `prices.index` and the Sharpe at day t is
computed from returns over [t-window, t].
"""
long_mask = factor_rank >= 0.9
short_mask = factor_rank <= 0.1
# Rebalance: hold the mask constant between rebal dates
rebal_mask = pd.Series(False, index=factor_rank.index)
rebal_mask.iloc[::rebal] = True
long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0)
short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0)
long_w[~rebal_mask] = np.nan
short_w[~rebal_mask] = np.nan
long_w = long_w.ffill().fillna(0.0)
short_w = short_w.ffill().fillna(0.0)
rets = prices.pct_change(fill_method=None)
long_ret = (long_w.shift(1) * rets).sum(axis=1)
short_ret = (short_w.shift(1) * rets).sum(axis=1)
long_turn = long_w.diff().abs().sum(axis=1).fillna(0.0)
short_turn = short_w.diff().abs().sum(axis=1).fillna(0.0)
ls_ret = (long_ret - short_ret) - (long_turn + short_turn) * tcost
ls_ret = ls_ret.fillna(0.0)
mean = ls_ret.rolling(window, min_periods=window // 2).mean()
std = ls_ret.rolling(window, min_periods=window // 2).std()
sharpe = (mean / std) * np.sqrt(252)
return sharpe
# ---------------------------------------------------------------------------
# Strategy
# ---------------------------------------------------------------------------
class AlphaFactorStrategy(Strategy):
"""
Multi-factor long-only with rolling LS-Sharpe-weighted signal blend,
inverse-vol weighting, and portfolio-level volatility targeting.
Why LS-Sharpe and not IC?
IC (rank-forward correlation) measures directional accuracy but ignores
the magnitude of cross-sectional dispersion. Two factors with identical
IC can have very different P&L. Empirically on this sample rev_1m has
IC t-stat +5 but LS Sharpe -12 — its top decile are freshly crashed
names that keep crashing. We weight by a lagged 252d rolling LS-Sharpe
(top-decile minus bottom-decile, monthly rebalance, 10bps t-cost) and
floor weights at zero so demoted factors simply drop out.
The strategy requires a market return series (e.g. SPY pct_change) passed
at construction time — it is NOT derived from data inside generate_signals,
because the cross-sectional universe contains only selected tickers while
we want a stable market benchmark for beta/residual computations.
"""
def __init__(
self,
mkt_returns: pd.Series,
top_n: int = 15,
rebal_freq: int = 10,
vol_window: int = 60,
vol_target_annual: float | None = 0.18,
ic_window: int = 252,
exposure_clip: tuple[float, float] = (0.30, 1.50),
fwd_window: int = 21,
weight_scheme: str = "ls_sharpe", # {"ls_sharpe", "ic", "equal"}
min_weight: float = 0.0, # floor per-factor weight (0 = drop losers)
):
self.mkt_returns = mkt_returns
self.top_n = top_n
self.rebal_freq = rebal_freq
self.vol_window = vol_window
self.vol_target_annual = vol_target_annual
self.ic_window = ic_window
self.exposure_clip = exposure_clip
self.fwd_window = fwd_window
self.weight_scheme = weight_scheme
self.min_weight = min_weight
# ---- Factor matrix ----
def compute_factors(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]:
betas, resid_vol = _rolling_beta_and_residvol(
data, self.mkt_returns, window=self.vol_window)
factors = {
"mom_12_1": f_mom_12_1(data),
"mom_7_1": f_mom_7_1(data),
"mom_residual": f_mom_residual(data, self.mkt_returns, betas=betas),
"rev_1m": f_rev_1m(data),
"w52_high": f_w52_high(data),
"max5_neg": f_max5_neg(data),
"recovery_63": f_recovery_63(data),
"trend_strength": f_trend_strength(data),
"idio_vol_neg": -resid_vol,
"low_beta": -betas,
}
return factors
# ---- Full pipeline ----
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
factors = self.compute_factors(data)
ranks = {k: xsec_rank(v) for k, v in factors.items()}
if self.weight_scheme == "ic":
fwd_ret = data.shift(-self.fwd_window) / data - 1
weight_series = {
k: rolling_ic(ranks[k], fwd_ret, window=self.ic_window).shift(self.fwd_window)
for k in ranks
}
elif self.weight_scheme == "ls_sharpe":
weight_series = {
k: _rolling_ls_sharpe(ranks[k], data,
window=self.ic_window,
rebal=21, tcost=0.001).shift(self.fwd_window)
for k in ranks
}
elif self.weight_scheme == "equal":
weight_series = {k: pd.Series(1.0, index=ranks[k].index) for k in ranks}
else:
raise ValueError(f"unknown weight_scheme {self.weight_scheme!r}")
composite = None
weight_norm = None
for k, rk in ranks.items():
w = weight_series[k].reindex(rk.index).fillna(0.0)
if self.min_weight is not None:
w = w.where(w > self.min_weight, 0.0)
contrib = rk.mul(w, axis=0)
composite = contrib if composite is None else composite.add(contrib, fill_value=0.0)
abs_w = w.abs()
weight_norm = abs_w if weight_norm is None else weight_norm.add(abs_w, fill_value=0)
weight_norm = weight_norm.replace(0, np.nan)
composite = composite.div(weight_norm, axis=0)
# Top-N selection.
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1)
# Inverse-vol weighting within top_n.
rets = data.pct_change(fill_method=None)
vol = rets.rolling(self.vol_window, min_periods=self.vol_window).std()
inv_vol = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0)
row_sums = inv_vol.sum(axis=1).replace(0, np.nan)
weights = inv_vol.div(row_sums, axis=0).fillna(0.0)
# Rebalance schedule.
warmup = max(252, self.vol_window + 21, self.ic_window + self.fwd_window)
rebal_mask = pd.Series(False, index=data.index)
rebal_idx = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_idx] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
# Volatility targeting at the portfolio level.
if self.vol_target_annual is not None:
# Use returns of the *current* weight vector; vol is trailing realized
# on the applied weights so no lookahead. Compute after ffill.
port_rets = (weights.shift(1) * rets).sum(axis=1)
port_vol = port_rets.rolling(self.vol_window,
min_periods=self.vol_window).std() * np.sqrt(252)
scale = (self.vol_target_annual / port_vol).clip(*self.exposure_clip)
scale = scale.fillna(method="ffill").fillna(1.0)
weights = weights.mul(scale, axis=0)
return weights.shift(1).fillna(0.0)

279
research/alpha_research.py Normal file
View File

@@ -0,0 +1,279 @@
"""
Professional QR-style factor research on the PIT S&P 500 universe.
Stage 1 — Factor diagnostics.
IC (Spearman, 21d fwd), t-stat, realistic long-short decile backtest
(monthly rebalance, 10 bps t-cost).
Stage 2 — Composite backtest 1/3/5/10y vs champions.
For 1y window we pre-pend 2y of warmup then score returns on the last 1y
only, so strategies with 252d+ warmup are actually active in-window.
Stage 3 — Config sweep across weight_scheme × top_n × rebal × vol_target.
Outputs CSVs to data/alpha_research_*.csv.
"""
from __future__ import annotations
import os
import warnings
import numpy as np
import pandas as pd
import research.pit_backtest as pit
from research.alpha_factors import (AlphaFactorStrategy, _rolling_beta_and_residvol,
f_mom_12_1, f_mom_7_1, f_rev_1m, f_w52_high,
f_max5_neg, f_recovery_63, f_trend_strength,
xsec_rank, _rolling_ls_sharpe)
from strategies.factor_combo import FactorComboStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
DATA_DIR = "data"
BENCHMARK = "SPY"
def load():
raw = pit.load_pit_prices()
masked = pit.pit_universe(raw)
if BENCHMARK in raw.columns:
masked[BENCHMARK] = raw[BENCHMARK]
return masked
def warmup_slice(df: pd.DataFrame, years: int, warmup_days: int = 500) -> tuple[pd.DataFrame, pd.Timestamp]:
"""Return (prices_with_warmup, measurement_start). Strategies are fed the
longer series, but metrics must be computed only from measurement_start."""
measurement_start = df.index[-1] - pd.DateOffset(years=years)
first_day = df.index[0]
# Keep all rows between measurement_start - warmup_days and end.
cutoff = max(first_day, measurement_start - pd.Timedelta(days=warmup_days * 1.5))
sliced = df[df.index >= cutoff]
return sliced, measurement_start
def measure(eq: pd.Series, start: pd.Timestamp, name: str = "") -> dict:
eq = eq[eq.index >= start]
# Re-base to 10_000 at start
eq = eq / eq.iloc[0] * 10_000
return pit.summarize(eq, name=name)
# ---------------------------------------------------------------------------
# Stage 1 — Factor diagnostics
# ---------------------------------------------------------------------------
def factor_diagnostics(masked: pd.DataFrame):
print("\n" + "=" * 110)
print("Stage 1 — Factor diagnostics (full 10y PIT, monthly rebal, 10bps t-cost)")
print("=" * 110)
tickers = [c for c in masked.columns if c != BENCHMARK]
prices = masked[tickers]
mkt_ret = masked[BENCHMARK].pct_change(fill_method=None)
betas, resid_vol = _rolling_beta_and_residvol(prices, mkt_ret, window=60)
from research.alpha_factors import f_mom_residual
factor_builders = {
"mom_12_1": lambda: f_mom_12_1(prices),
"mom_7_1": lambda: f_mom_7_1(prices),
"mom_residual": lambda: f_mom_residual(prices, mkt_ret, betas=betas),
"rev_1m": lambda: f_rev_1m(prices),
"w52_high": lambda: f_w52_high(prices),
"max5_neg": lambda: f_max5_neg(prices),
"recovery_63": lambda: f_recovery_63(prices),
"trend_strength": lambda: f_trend_strength(prices),
"idio_vol_neg": lambda: -resid_vol,
"low_beta": lambda: -betas,
}
fwd_21 = prices.shift(-21) / prices - 1
fwd_rank = fwd_21.rank(axis=1, pct=True, na_option="keep")
rows = []
for name, build in factor_builders.items():
fac = build()
fr = xsec_rank(fac)
ic_daily = fr.corrwith(fwd_rank, axis=1).dropna()
ic_mean = ic_daily.mean()
ic_t = ic_mean / (ic_daily.std() / np.sqrt(len(ic_daily))) if len(ic_daily) > 1 else 0.0
ls = realistic_decile_spread(fr, prices, rebal=21, tcost=0.001)
long_only = realistic_top_decile(fr, prices, rebal=21, tcost=0.001)
rows.append({
"factor": name,
"IC_mean": ic_mean, "IC_t": ic_t,
"LS_CAGR": ls["CAGR"], "LS_Sharpe": ls["Sharpe"],
"LO_CAGR": long_only["CAGR"], "LO_Sharpe": long_only["Sharpe"],
"LO_MaxDD": long_only["MaxDD"],
})
df = pd.DataFrame(rows).sort_values("LO_Sharpe", ascending=False)
df.to_csv(os.path.join(DATA_DIR, "alpha_research_factors.csv"), index=False)
print(df.to_string(index=False, formatters={
"IC_mean": "{:+.4f}".format, "IC_t": "{:+.2f}".format,
"LS_CAGR": "{:+.1%}".format, "LS_Sharpe": "{:+.2f}".format,
"LO_CAGR": "{:+.1%}".format, "LO_Sharpe": "{:+.2f}".format,
"LO_MaxDD": "{:.1%}".format,
}))
return df
def realistic_decile_spread(factor_rank, prices, rebal=21, tcost=0.001):
"""Long top-decile minus short bottom-decile, monthly rebal, 10bps t-cost."""
long_mask = factor_rank >= 0.9
short_mask = factor_rank <= 0.1
long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0)
short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0)
rebal_mask = pd.Series(False, index=factor_rank.index)
rebal_mask.iloc[::rebal] = True
long_w[~rebal_mask] = np.nan
short_w[~rebal_mask] = np.nan
long_w = long_w.ffill().fillna(0.0)
short_w = short_w.ffill().fillna(0.0)
rets = prices.pct_change(fill_method=None)
ls = ((long_w.shift(1) * rets).sum(axis=1)
- (short_w.shift(1) * rets).sum(axis=1)) \
- (long_w.diff().abs().sum(axis=1).fillna(0.0)
+ short_w.diff().abs().sum(axis=1).fillna(0.0)) * tcost
ls = ls.fillna(0.0).iloc[252:]
eq = (1 + ls).cumprod() * 10_000
return pit.summarize(eq, name="ls")
def realistic_top_decile(factor_rank, prices, rebal=21, tcost=0.001):
"""Long-only top-decile equal-weight portfolio with t-cost."""
long_mask = factor_rank >= 0.9
long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0)
rebal_mask = pd.Series(False, index=factor_rank.index)
rebal_mask.iloc[::rebal] = True
long_w[~rebal_mask] = np.nan
long_w = long_w.ffill().fillna(0.0)
rets = prices.pct_change(fill_method=None)
port_ret = (long_w.shift(1) * rets).sum(axis=1) \
- long_w.diff().abs().sum(axis=1).fillna(0.0) * tcost
port_ret = port_ret.fillna(0.0).iloc[252:]
eq = (1 + port_ret).cumprod() * 10_000
return pit.summarize(eq, name="lo")
# ---------------------------------------------------------------------------
# Stage 2 — Composite backtest
# ---------------------------------------------------------------------------
def composite_backtest(masked: pd.DataFrame):
print("\n" + "=" * 110)
print("Stage 2 — IC / LS-Sharpe-weighted composite vs champions (1/3/5/10y)")
print("=" * 110)
tickers = [c for c in masked.columns if c != BENCHMARK]
mkt_ret_full = masked[BENCHMARK].pct_change(fill_method=None)
configs = {
"Alpha(LS-Sharpe, tn=15, rebal=10)":
lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=10,
vol_target_annual=None, weight_scheme="ls_sharpe"),
"Alpha(LS-Sharpe, tn=15, rebal=21)":
lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=21,
vol_target_annual=None, weight_scheme="ls_sharpe"),
"Alpha(LS-Sharpe+VT18, tn=15, rebal=21)":
lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=21,
vol_target_annual=0.18, weight_scheme="ls_sharpe"),
"Alpha(IC, tn=15, rebal=21)":
lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=21,
vol_target_annual=None, weight_scheme="ic"),
"Recovery+Mom Top10": lambda: RecoveryMomentumStrategy(top_n=10),
"fc_up_cap+mom_gap": lambda: FactorComboStrategy("up_cap+mom_gap",
rebal_freq=21, top_n=10),
}
all_rows = []
for years in (10, 5, 3, 1):
sliced, measurement_start = warmup_slice(masked, years, warmup_days=500)
prices = sliced[tickers]
print(f"\n --- Window: last {years}y "
f"(measure {measurement_start.date()}{sliced.index[-1].date()}, "
f"warmup from {sliced.index[0].date()}) ---")
spy = sliced[BENCHMARK].dropna()
spy_eq = (spy / spy.iloc[0]) * 10_000
rows = [{"years": years, "strategy": "SPY buy-and-hold",
**{k: v for k, v in measure(spy_eq, measurement_start, "").items()
if k != "name"}}]
for name, factory in configs.items():
strat = factory()
eq = pit.backtest(strategy=strat, prices=prices,
initial_capital=10_000, transaction_cost=0.001)
m = measure(eq, measurement_start, "")
rows.append({"years": years, "strategy": name,
**{k: v for k, v in m.items() if k != "name"}})
for r in rows:
print(f" {r['strategy']:<42s} "
f"CAGR={r['CAGR']*100:>6.1f}% "
f"Sharpe={r['Sharpe']:>5.2f} "
f"Sortino={r['Sortino']:>5.2f} "
f"MaxDD={r['MaxDD']*100:>6.1f}% "
f"Calmar={r['Calmar']:>5.2f}")
all_rows.extend(rows)
df = pd.DataFrame(all_rows)
df.to_csv(os.path.join(DATA_DIR, "alpha_research_composite.csv"), index=False)
return df
# ---------------------------------------------------------------------------
# Stage 3 — Config sweep
# ---------------------------------------------------------------------------
def config_sweep(masked: pd.DataFrame):
print("\n" + "=" * 110)
print("Stage 3 — AlphaFactor config sweep (10y)")
print("=" * 110)
tickers = [c for c in masked.columns if c != BENCHMARK]
prices = masked[tickers]
mkt_ret = masked[BENCHMARK].pct_change(fill_method=None)
rows = []
for scheme in ("ls_sharpe", "ic", "equal"):
for top_n in (10, 15, 20):
for rebal in (10, 21):
for vt in (None, 0.18):
strat = AlphaFactorStrategy(mkt_ret, top_n=top_n, rebal_freq=rebal,
vol_target_annual=vt,
weight_scheme=scheme)
eq = pit.backtest(strat, prices, initial_capital=10_000,
transaction_cost=0.001)
s = pit.summarize(eq, "")
rows.append({"scheme": scheme, "top_n": top_n, "rebal": rebal,
"vt": vt if vt is not None else "none",
"CAGR": s["CAGR"], "Sharpe": s["Sharpe"],
"MaxDD": s["MaxDD"], "Calmar": s["Calmar"]})
df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False)
df.to_csv(os.path.join(DATA_DIR, "alpha_research_sweep.csv"), index=False)
print(df.head(15).to_string(index=False, formatters={
"CAGR": "{:.1%}".format, "Sharpe": "{:.2f}".format,
"MaxDD": "{:.1%}".format, "Calmar": "{:.2f}".format,
}))
return df
def main():
print("Loading PIT data…")
masked = load()
print(f" shape={masked.shape} range={masked.index[0].date()}{masked.index[-1].date()}")
factor_diagnostics(masked)
composite_backtest(masked)
sweep = config_sweep(masked)
print("\n" + "=" * 110)
print("Top 5 configs:")
print("=" * 110)
print(sweep.head(5).to_string(index=False, formatters={
"CAGR": "{:.1%}".format, "Sharpe": "{:.2f}".format,
"MaxDD": "{:.1%}".format, "Calmar": "{:.2f}".format,
}))
if __name__ == "__main__":
main()

294
research/factor_optimize.py Normal file
View File

@@ -0,0 +1,294 @@
"""
Factor-level optimization on the point-in-time S&P 500 universe.
Builds on the sweep results in data/sweep_*.csv. Runs four experiments:
O1 — RecoveryMomentumPlus hyperparameter grid (top_n × rec_window × rec_weight × rebal),
with 2016-2022 train / 2023-2026 test split. Picks best by Sharpe-on-test.
O2 — SPY>MA regime filter applied to the 3 highest-Sharpe strategies (10y window).
O3 — Top-3 uncorrelated ensemble: greedy corr<0.85 selection → equal-weight blend.
O4 — Factor-mix parameter sweep on the FactorCombo "up_cap+mom_gap" signal
(top_n × rebal_freq).
All experiments run on PIT-masked data. Results printed + written to
data/factor_optimize_<exp>.csv.
Usage:
uv run python -m research.factor_optimize
"""
from __future__ import annotations
import os
import warnings
import numpy as np
import pandas as pd
import research.pit_backtest as pit
from research.strategies_plus import (EnsembleStrategy, RecoveryMomentumPlus,
spy_ma200_filter)
from strategies.factor_combo import FactorComboStrategy, SIGNAL_REGISTRY
from strategies.momentum_quality import MomentumQualityStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
warnings.filterwarnings("ignore", category=FutureWarning)
DATA_DIR = "data"
BENCHMARK = "SPY"
def load_masked_prices():
raw = pit.load_pit_prices()
masked = pit.pit_universe(raw)
if BENCHMARK in raw.columns:
masked[BENCHMARK] = raw[BENCHMARK]
return masked
def slice_period(df, start=None, end=None):
out = df
if start:
out = out[out.index >= start]
if end:
out = out[out.index <= end]
return out
def run(strat, prices, *, regime_filter=None):
return pit.backtest(
strategy=strat, prices=prices, initial_capital=10_000,
transaction_cost=0.001, regime_filter=regime_filter,
)
# ---------------------------------------------------------------------------
# O1 — RecoveryMomentumPlus hyperparameter grid
# ---------------------------------------------------------------------------
def o1_hyperparam_sweep(masked):
print("\n" + "=" * 100)
print("O1 — RecoveryMomentumPlus sweep (train 2016-2022 / test 2023-2026)")
print("=" * 100)
tickers = [c for c in masked.columns if c != BENCHMARK]
prices = masked[tickers]
train = slice_period(prices, "2016-04-19", "2022-12-31")
test = slice_period(prices, "2023-01-01", None)
grid = []
for top_n in (5, 10, 15, 20):
for rec_win in (42, 63, 126):
for rec_w in (0.3, 0.5, 0.7):
for rebal in (5, 10, 21):
grid.append((top_n, rec_win, rec_w, rebal))
rows = []
for i, (top_n, rec_win, rec_w, rebal) in enumerate(grid, 1):
cfg = dict(top_n=top_n, recovery_window=rec_win,
rec_weight=rec_w, rebal_freq=rebal)
tr = pit.summarize(run(RecoveryMomentumPlus(**cfg), train), "")
te = pit.summarize(run(RecoveryMomentumPlus(**cfg), test), "")
rows.append({**cfg,
"train_CAGR": tr["CAGR"], "train_Sharpe": tr["Sharpe"],
"test_CAGR": te["CAGR"], "test_Sharpe": te["Sharpe"],
"test_MaxDD": te["MaxDD"], "test_Calmar": te["Calmar"]})
if i % 12 == 0 or i == len(grid):
print(f"{i}/{len(grid)} configs evaluated")
df = pd.DataFrame(rows).sort_values("test_Sharpe", ascending=False)
out = os.path.join(DATA_DIR, "factor_optimize_O1.csv")
df.to_csv(out, index=False)
print("\n --- Top 10 by out-of-sample Sharpe (2023-2026) ---")
disp = ["top_n", "recovery_window", "rec_weight", "rebal_freq",
"train_Sharpe", "test_Sharpe", "train_CAGR", "test_CAGR",
"test_MaxDD", "test_Calmar"]
print(df.head(10)[disp].to_string(index=False, formatters={
"train_Sharpe": "{:.2f}".format, "test_Sharpe": "{:.2f}".format,
"train_CAGR": "{:.1%}".format, "test_CAGR": "{:.1%}".format,
"test_MaxDD": "{:.1%}".format, "test_Calmar": "{:.2f}".format,
}))
return df
# ---------------------------------------------------------------------------
# O2 — Regime filter on the top strategies
# ---------------------------------------------------------------------------
def o2_regime(masked):
print("\n" + "=" * 100)
print("O2 — SPY > MA regime filter on top strategies (full 10y PIT)")
print("=" * 100)
tickers = [c for c in masked.columns if c != BENCHMARK]
prices = masked[tickers]
spy_full = masked[BENCHMARK].dropna()
contenders = {
"Recovery+Mom Top10": RecoveryMomentumStrategy(top_n=10),
"fc_up_cap_mom_gap_monthly": FactorComboStrategy("up_cap+mom_gap",
rebal_freq=21, top_n=10),
"fc_rec63_mom_gap_monthly": FactorComboStrategy("rec63+mom_gap",
rebal_freq=21, top_n=10),
}
rows = []
for name, strat in contenders.items():
base = run(strat, prices)
rows.append({"strategy": name, "filter": "none",
**{k: v for k, v in pit.summarize(base, "").items() if k != "name"}})
for ma in (200, 150, 100):
filt = spy_ma200_filter(spy_full, ma_window=ma).reindex(prices.index).fillna(False)
strat_fresh = _fresh_copy(strat)
eq = run(strat_fresh, prices, regime_filter=filt)
rows.append({"strategy": name, "filter": f"SPY>MA{ma}",
**{k: v for k, v in pit.summarize(eq, "").items() if k != "name"}})
df = pd.DataFrame(rows)
df.to_csv(os.path.join(DATA_DIR, "factor_optimize_O2.csv"), index=False)
print(f" {'strategy':<32s} {'filter':<12s} {'CAGR':>7s} {'Sharpe':>7s} "
f"{'MaxDD':>7s} {'Calmar':>7s}")
for _, r in df.iterrows():
print(f" {r['strategy']:<32s} {r['filter']:<12s} "
f"{r['CAGR']*100:>6.1f}% {r['Sharpe']:>7.2f} "
f"{r['MaxDD']*100:>6.1f}% {r['Calmar']:>7.2f}")
return df
def _fresh_copy(strat):
"""Re-instantiate a strategy so state (if any) is reset between backtests."""
if isinstance(strat, RecoveryMomentumStrategy):
return RecoveryMomentumStrategy(
recovery_window=strat.recovery_window, mom_lookback=strat.mom_lookback,
mom_skip=strat.mom_skip, rebal_freq=strat.rebal_freq, top_n=strat.top_n)
if isinstance(strat, FactorComboStrategy):
return FactorComboStrategy(strat.signal_name, rebal_freq=strat.rebal_freq,
top_n=strat.top_n)
if isinstance(strat, MomentumQualityStrategy):
return MomentumQualityStrategy(
momentum_period=strat.momentum_period, skip=strat.skip,
quality_window=strat.quality_window, top_n=strat.top_n)
return strat # already stateless for our uses
# ---------------------------------------------------------------------------
# O3 — Uncorrelated ensemble
# ---------------------------------------------------------------------------
def o3_ensemble(masked):
print("\n" + "=" * 100)
print("O3 — Greedy uncorrelated ensemble (full 10y PIT)")
print("=" * 100)
tickers = [c for c in masked.columns if c != BENCHMARK]
prices = masked[tickers]
spy_full = masked[BENCHMARK].dropna()
# Candidate pool: the production strategies that cleared 0.75 Sharpe in 10y sweep.
candidates: list[tuple[str, object]] = [
("Recovery+Mom Top10", RecoveryMomentumStrategy(top_n=10)),
("fc_up_cap_mom_gap_monthly", FactorComboStrategy("up_cap+mom_gap", 21, 10)),
("fc_rec63_mom_gap_monthly", FactorComboStrategy("rec63+mom_gap", 21, 10)),
("fc_up_cap_quality_mom_monthly", FactorComboStrategy("up_cap+quality_mom", 21, 10)),
("fc_rec_mfilt_deep_upvol_monthly", FactorComboStrategy("rec_mfilt+deep_upvol", 21, 10)),
("fc_mom7m_rec126_monthly", FactorComboStrategy("mom7m+rec126", 21, 10)),
("Recovery+Mom Top20", RecoveryMomentumStrategy(top_n=20)),
("fc_down_resil_qual_mom_monthly", FactorComboStrategy("down_resil+qual_mom", 21, 10)),
]
equities: dict[str, pd.Series] = {name: run(s, prices) for name, s in candidates}
returns = pd.DataFrame({n: eq.pct_change().fillna(0) for n, eq in equities.items()})
sharpes = {n: pit.summarize(eq, n)["Sharpe"] for n, eq in equities.items()}
order = sorted(candidates, key=lambda t: sharpes[t[0]], reverse=True)
picked_names: list[str] = []
picked: list[tuple[object, float]] = []
for name, strat in order:
if any(returns[name].corr(returns[p]) > 0.85 for p in picked_names):
continue
picked_names.append(name)
picked.append((strat, 1.0))
if len(picked) >= 3:
break
print(f" Selected {len(picked)} uncorrelated components:")
for name in picked_names:
print(f" - {name} (Sharpe={sharpes[name]:.2f})")
ens = EnsembleStrategy(picked)
eq_ens = run(ens, prices)
filt = spy_ma200_filter(spy_full).reindex(prices.index).fillna(False)
eq_ens_reg = run(EnsembleStrategy(picked), prices, regime_filter=filt)
spy_bh = (masked[BENCHMARK].dropna().pipe(lambda s: s / s.iloc[0] * 10_000))
rows = [pit.summarize(spy_bh, "SPY buy-and-hold")]
for name in picked_names:
rows.append(pit.summarize(equities[name], f" component: {name}"))
rows.append(pit.summarize(eq_ens, "ENSEMBLE (equal-weight, no filter)"))
rows.append(pit.summarize(eq_ens_reg, "ENSEMBLE + SPY>MA200 filter"))
for r in rows:
print(pit.fmt_row(r))
df = pd.DataFrame(rows)
df.to_csv(os.path.join(DATA_DIR, "factor_optimize_O3.csv"), index=False)
return df, picked_names
# ---------------------------------------------------------------------------
# O4 — FactorCombo up_cap+mom_gap: top_n × rebal sweep
# ---------------------------------------------------------------------------
def o4_factorcombo_sweep(masked):
print("\n" + "=" * 100)
print("O4 — FactorCombo up_cap+mom_gap: top_n × rebal (full 10y PIT)")
print("=" * 100)
tickers = [c for c in masked.columns if c != BENCHMARK]
prices = masked[tickers]
rows = []
for top_n in (5, 8, 10, 15, 20, 30):
for rebal in (5, 10, 21, 42):
strat = FactorComboStrategy("up_cap+mom_gap", rebal_freq=rebal, top_n=top_n)
eq = run(strat, prices)
s = pit.summarize(eq, f"top_n={top_n} rebal={rebal}")
rows.append({"top_n": top_n, "rebal": rebal,
"CAGR": s["CAGR"], "Sharpe": s["Sharpe"],
"MaxDD": s["MaxDD"], "Calmar": s["Calmar"]})
df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False)
df.to_csv(os.path.join(DATA_DIR, "factor_optimize_O4.csv"), index=False)
print(f" {'top_n':<8s}{'rebal':<8s}{'CAGR':>8s}{'Sharpe':>9s}"
f"{'MaxDD':>9s}{'Calmar':>9s}")
for _, r in df.iterrows():
print(f" {int(r['top_n']):<8d}{int(r['rebal']):<8d}"
f"{r['CAGR']*100:>7.1f}%{r['Sharpe']:>9.2f}"
f"{r['MaxDD']*100:>8.1f}%{r['Calmar']:>9.2f}")
return df
def main():
print("Loading PIT-masked price data…")
masked = load_masked_prices()
print(f" shape={masked.shape} range={masked.index[0].date()}{masked.index[-1].date()}")
o1 = o1_hyperparam_sweep(masked)
o2 = o2_regime(masked)
o3, picks = o3_ensemble(masked)
o4 = o4_factorcombo_sweep(masked)
print("\n" + "=" * 100)
print("Summary: best config from each experiment")
print("=" * 100)
best_o1 = o1.iloc[0]
print(f" O1 best OOS Sharpe: top_n={int(best_o1['top_n'])} rec_win={int(best_o1['recovery_window'])} "
f"rec_w={best_o1['rec_weight']} rebal={int(best_o1['rebal_freq'])} "
f"→ test Sharpe={best_o1['test_Sharpe']:.2f} test CAGR={best_o1['test_CAGR']*100:.1f}%")
best_o4 = o4.iloc[0]
print(f" O4 best overall: top_n={int(best_o4['top_n'])} rebal={int(best_o4['rebal'])} "
f"Sharpe={best_o4['Sharpe']:.2f} CAGR={best_o4['CAGR']*100:.1f}% "
f"Calmar={best_o4['Calmar']:.2f}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,233 @@
"""
Interaction / multiplicative factor strategy.
Rationale: in the 10y PIT diagnostics, each single-factor top decile clocks
~0.50.8 Sharpe, yet the production Recovery+Mom Top10 delivers 0.92. The
extra alpha comes from an AND-style interaction — stocks that rank high on
BOTH factors simultaneously. Linear rank-blending loses this because a stock
can make top_n by being middling on many factors instead of extreme on a few.
This module provides:
* `MultiplicativeFactorStrategy` — picks top_n stocks by the geometric mean
(equivalently the product) of cross-sectional factor ranks. Concentrates
on consensus winners.
* `VotingFactorStrategy` — counts how many factors place a stock in its
top `vote_pct`; selects stocks clearing a minimum vote threshold. Breaks
ties by the sum of ranks. Robust when factor ICs drift.
* `SubStrategyEnsemble` — equal-weight blend of Recovery+Mom Top10,
fc_up_cap+mom_gap monthly, and a new Multiplicative("mom × recovery ×
idio_vol_neg") sleeve. Diversifies across independent alpha sources
rather than across factor primitives.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from research.alpha_factors import (_rolling_beta_and_residvol, f_mom_12_1,
f_mom_7_1, f_rev_1m, f_w52_high, f_max5_neg,
f_recovery_63, f_trend_strength, xsec_rank,
f_mom_residual)
from strategies.base import Strategy
from strategies.factor_combo import FactorComboStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
# ---------------------------------------------------------------------------
# Multiplicative top-N
# ---------------------------------------------------------------------------
class MultiplicativeFactorStrategy(Strategy):
"""
Top-N by product of selected factor ranks (equivalent to rank-geometric-mean).
Parameters
----------
factor_names : list[str]
Keys into the factor library. Supported:
mom_12_1, mom_7_1, mom_residual, recovery_63, w52_high,
idio_vol_neg, mom_x_recovery (shortcut pair).
top_n : int
Number of stocks.
rebal_freq : int
Rebal interval in trading days.
mkt_returns : pd.Series | None
Required for mom_residual / idio_vol_neg.
"""
def __init__(self, factor_names: list[str], top_n: int = 10,
rebal_freq: int = 21, mkt_returns: pd.Series | None = None,
weighting: str = "equal", signal_concentration: float = 0.0,
dispersion_scale: bool = False):
"""
Parameters
----------
signal_concentration : float
Exponent applied to composite score when weighting=='signal'.
0 → equal weight within top_n; higher → more weight on top ranks.
dispersion_scale : bool
Scale total exposure by z-scored cross-sectional rank dispersion,
clipped to [0.5, 1.3]. Expands in high-dispersion regimes.
"""
self.factor_names = factor_names
self.top_n = top_n
self.rebal_freq = rebal_freq
self.mkt_returns = mkt_returns
self.weighting = weighting
self.signal_concentration = signal_concentration
self.dispersion_scale = dispersion_scale
def _build(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]:
betas, resid_vol = (None, None)
if any(f in ("mom_residual", "idio_vol_neg", "low_beta") for f in self.factor_names):
if self.mkt_returns is None:
raise ValueError("mkt_returns required for beta-based factors")
betas, resid_vol = _rolling_beta_and_residvol(data, self.mkt_returns, 60)
lib = {
"mom_12_1": lambda: f_mom_12_1(data),
"mom_7_1": lambda: f_mom_7_1(data),
"mom_residual": lambda: f_mom_residual(data, self.mkt_returns, betas=betas),
"recovery_63": lambda: f_recovery_63(data),
"w52_high": lambda: f_w52_high(data),
"idio_vol_neg": lambda: -resid_vol,
"low_beta": lambda: -betas,
"trend": lambda: f_trend_strength(data),
}
return {n: lib[n]() for n in self.factor_names}
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
factors = self._build(data)
ranks = {n: xsec_rank(v) for n, v in factors.items()}
# Product of ranks. If any rank is NaN, product is NaN → row excluded.
composite = None
for rk in ranks.values():
composite = rk if composite is None else composite.mul(rk, fill_value=np.nan)
composite = composite.where(~rk.isna(), np.nan)
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1)
if self.weighting == "equal":
raw = top_mask.astype(float)
elif self.weighting == "inv_vol":
vol = data.pct_change(fill_method=None).rolling(60).std()
raw = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0)
elif self.weighting == "signal":
# Weight ∝ composite^concentration, only among top_mask picks.
score = composite.where(top_mask, 0.0).fillna(0.0)
raw = score ** max(self.signal_concentration, 1.0)
else:
raise ValueError(f"bad weighting {self.weighting!r}")
row_sums = raw.sum(axis=1).replace(0, np.nan)
weights = raw.div(row_sums, axis=0).fillna(0.0)
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
if self.dispersion_scale:
# Cross-sectional rank dispersion = daily std of composite. Scale
# exposure up in high-dispersion regimes (alpha opportunity richer).
disp = composite.std(axis=1)
z = (disp - disp.rolling(252, min_periods=126).mean()) \
/ disp.rolling(252, min_periods=126).std()
scale = (1.0 + 0.3 * z.clip(-1, 1)).clip(0.5, 1.3)
scale = scale.reindex(weights.index).fillna(1.0)
weights = weights.mul(scale, axis=0)
return weights.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Voting top-N
# ---------------------------------------------------------------------------
class VotingFactorStrategy(Strategy):
"""
Top-N by vote count: each factor contributes 1 vote if a stock is in its
top `vote_pct` percentile. Select stocks with vote_count ≥ min_votes,
break ties by sum of ranks.
"""
def __init__(self, factor_names: list[str], top_n: int = 10,
rebal_freq: int = 21, vote_pct: float = 0.25,
min_votes: int = 3, mkt_returns: pd.Series | None = None):
self.factor_names = factor_names
self.top_n = top_n
self.rebal_freq = rebal_freq
self.vote_pct = vote_pct
self.min_votes = min_votes
self.mkt_returns = mkt_returns
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
builder = MultiplicativeFactorStrategy(
factor_names=self.factor_names, top_n=self.top_n,
rebal_freq=self.rebal_freq, mkt_returns=self.mkt_returns)
factors = builder._build(data)
ranks = {n: xsec_rank(v) for n, v in factors.items()}
thresh = 1 - self.vote_pct
votes = sum((rk >= thresh).astype(float) for rk in ranks.values())
rank_sum = sum(rk.fillna(0) for rk in ranks.values())
# Primary sort: vote count; tiebreaker: rank_sum. Build a composite.
composite = votes + rank_sum / (len(ranks) * 10)
composite = composite.where(votes >= self.min_votes, np.nan)
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= 1
effective_n = n_valid.clip(upper=self.top_n)
top_mask = (sel_rank <= effective_n.values.reshape(-1, 1)) & enough.values.reshape(-1, 1)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
weights = raw.div(row_sums, axis=0).fillna(0.0)
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
return weights.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Sub-strategy ensemble
# ---------------------------------------------------------------------------
class SubStrategyEnsemble(Strategy):
"""Equal-weight blend of several long-only sub-strategies."""
def __init__(self, sub_strats: list[Strategy]):
self.sub_strats = sub_strats
self.w = 1.0 / len(sub_strats)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
out = None
for strat in self.sub_strats:
sig = strat.generate_signals(data) * self.w
out = sig if out is None else out.add(sig, fill_value=0.0)
return out
def default_ensemble(mkt_returns: pd.Series) -> SubStrategyEnsemble:
return SubStrategyEnsemble([
RecoveryMomentumStrategy(top_n=10),
FactorComboStrategy("up_cap+mom_gap", rebal_freq=21, top_n=10),
MultiplicativeFactorStrategy(
factor_names=["mom_12_1", "recovery_63", "idio_vol_neg"],
top_n=10, rebal_freq=21, mkt_returns=mkt_returns,
),
])

136
research/run_interaction.py Normal file
View File

@@ -0,0 +1,136 @@
"""
Evaluate interaction/ensemble strategies on 1/3/5/10y PIT windows with a
proper 500-day warmup preload, so 252d-warmup strategies are active from the
measurement start.
"""
from __future__ import annotations
import os
import warnings
import pandas as pd
import research.pit_backtest as pit
from research.alpha_factors import AlphaFactorStrategy
from research.interaction_alpha import (MultiplicativeFactorStrategy,
SubStrategyEnsemble, VotingFactorStrategy,
default_ensemble)
from strategies.factor_combo import FactorComboStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
DATA_DIR = "data"
BENCHMARK = "SPY"
def load():
raw = pit.load_pit_prices()
masked = pit.pit_universe(raw)
masked[BENCHMARK] = raw[BENCHMARK]
return masked
def warmup_slice(df, years, warmup_days=500):
measurement_start = df.index[-1] - pd.DateOffset(years=years)
cutoff = max(df.index[0], measurement_start - pd.Timedelta(days=warmup_days * 1.5))
return df[df.index >= cutoff], measurement_start
def measure(eq, start, name=""):
eq = eq[eq.index >= start]
eq = eq / eq.iloc[0] * 10_000
return pit.summarize(eq, name=name)
def make_configs(mkt_ret):
pair = ["mom_12_1", "recovery_63"]
return {
"Mult(mom12×rec63) eq, tn=10":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=10, rebal_freq=21, mkt_returns=mkt_ret),
"Mult(mom12×rec63) eq, tn=15":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=15, rebal_freq=21, mkt_returns=mkt_ret),
"Mult(mom12×rec63) eq, rebal=10":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=10, rebal_freq=10, mkt_returns=mkt_ret),
"Mult(mom12×rec63) sig^2, tn=15":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=15, rebal_freq=21, mkt_returns=mkt_ret,
weighting="signal", signal_concentration=2.0),
"Mult(mom12×rec63) sig^4, tn=15":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=15, rebal_freq=21, mkt_returns=mkt_ret,
weighting="signal", signal_concentration=4.0),
"Mult(mom12×rec63) disp-scale":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=10, rebal_freq=21, mkt_returns=mkt_ret,
dispersion_scale=True),
"Mult(mom12×rec63) inv_vol":
lambda: MultiplicativeFactorStrategy(
factor_names=pair, top_n=10, rebal_freq=21, mkt_returns=mkt_ret,
weighting="inv_vol"),
"Ensemble3 (RM/upcap/mult)":
lambda: default_ensemble(mkt_ret),
"Recovery+Mom Top10":
lambda: RecoveryMomentumStrategy(top_n=10),
"fc_up_cap+mom_gap":
lambda: FactorComboStrategy("up_cap+mom_gap", rebal_freq=21, top_n=10),
}
def main():
print("Loading PIT data…")
masked = load()
tickers = [c for c in masked.columns if c != BENCHMARK]
mkt_ret = masked[BENCHMARK].pct_change(fill_method=None)
print(f" shape={masked.shape} range={masked.index[0].date()}{masked.index[-1].date()}")
rows = []
for years in (10, 5, 3, 1):
sliced, start = warmup_slice(masked, years, warmup_days=500)
prices = sliced[tickers]
print(f"\n--- {years}y window "
f"(measure {start.date()}{sliced.index[-1].date()}, "
f"warmup from {sliced.index[0].date()}) ---")
spy = sliced[BENCHMARK].dropna()
spy_eq = (spy / spy.iloc[0]) * 10_000
m = measure(spy_eq, start, "")
rows.append({"years": years, "strategy": "SPY buy-and-hold",
**{k: v for k, v in m.items() if k != "name"}})
configs = make_configs(mkt_ret)
for name, factory in configs.items():
strat = factory()
eq = pit.backtest(strategy=strat, prices=prices,
initial_capital=10_000, transaction_cost=0.001)
m = measure(eq, start, "")
rows.append({"years": years, "strategy": name,
**{k: v for k, v in m.items() if k != "name"}})
tail = [r for r in rows if r["years"] == years]
tail.sort(key=lambda r: r["Sharpe"], reverse=True)
for r in tail:
print(f" {r['strategy']:<34s} CAGR={r['CAGR']*100:>6.1f}% "
f"Sharpe={r['Sharpe']:>5.2f} Sortino={r['Sortino']:>5.2f} "
f"MaxDD={r['MaxDD']*100:>6.1f}% Calmar={r['Calmar']:>5.2f}")
df = pd.DataFrame(rows)
df.to_csv(os.path.join(DATA_DIR, "interaction_results.csv"), index=False)
print("\n=== Cross-window CAGR summary (sorted by 10y Sharpe) ===")
pv = df.pivot(index="strategy", columns="years", values="CAGR")
pv.columns = [f"CAGR_{y}y" for y in pv.columns]
sh10 = df[df["years"] == 10].set_index("strategy")["Sharpe"]
pv["Sharpe_10y"] = sh10
pv = pv.sort_values("Sharpe_10y", ascending=False)
print(pv.to_string(formatters={
"CAGR_10y": "{:.1%}".format, "CAGR_5y": "{:.1%}".format,
"CAGR_3y": "{:.1%}".format, "CAGR_1y": "{:.1%}".format,
"Sharpe_10y": "{:.2f}".format,
}))
if __name__ == "__main__":
main()

145
research/strategy_sweep.py Normal file
View File

@@ -0,0 +1,145 @@
"""
Unified 3/5/10-year PIT backtest for every production strategy.
Runs the full strategy roster against the point-in-time S&P 500 price matrix
from research/pit_backtest and reports CAGR / Sharpe / Sortino / MaxDD / Calmar
for three trailing windows. Results are written to data/sweep_<years>y.csv and
printed to stdout.
Usage:
uv run python -m research.strategy_sweep
"""
import os
import pandas as pd
import research.pit_backtest as pit
from strategies.adaptive_momentum import AdaptiveMomentumStrategy
from strategies.dual_momentum import DualMomentumStrategy
from strategies.factor_combo import SIGNAL_REGISTRY, FactorComboStrategy
from strategies.inverse_vol import InverseVolatilityStrategy
from strategies.mean_reversion import MeanReversionStrategy
from strategies.momentum import MomentumStrategy
from strategies.momentum_quality import MomentumQualityStrategy
from strategies.multi_factor import MultiFactorStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
from strategies.trend_following import TrendFollowingStrategy
DATA_DIR = "data"
BENCHMARK = "SPY"
def build_strategies(tickers: list[str]) -> dict:
"""Instantiate every production strategy; returns {name: strategy}."""
top_n = max(5, len(tickers) // 10)
strategies: dict = {
# --- Baselines ---
"SPY buy-and-hold": None, # handled separately
"Momentum": MomentumStrategy(lookback=252, skip=21, top_n=top_n),
"Inverse Volatility": InverseVolatilityStrategy(vol_window=20),
"Multi-Factor": MultiFactorStrategy(tickers=tickers, benchmark=BENCHMARK,
top_n=top_n),
"Mean Reversion": MeanReversionStrategy(top_n=top_n),
"Trend Following": TrendFollowingStrategy(ma_window=150, momentum_period=126,
top_n=top_n),
"Dual Momentum": DualMomentumStrategy(top_n=top_n),
"Momentum+Quality": MomentumQualityStrategy(momentum_period=252, skip=21,
top_n=top_n),
"Mom+InvVol": AdaptiveMomentumStrategy(top_n=top_n),
"Recovery+Mom Top20": RecoveryMomentumStrategy(top_n=min(20, top_n)),
"Recovery+Mom Top10": RecoveryMomentumStrategy(top_n=10),
}
# Factor-combo (monthly rebalance; biweekly is the other interesting one,
# but monthly aligns with how the RecoveryMomentum defaults are set).
for name in SIGNAL_REGISTRY:
key = f"fc_{name.replace('+', '_').replace('×', 'x')}_monthly"
strategies[key] = FactorComboStrategy(name, rebal_freq=21, top_n=10)
return strategies
def slice_years(prices: pd.DataFrame, years: int) -> pd.DataFrame:
cutoff = prices.index[-1] - pd.DateOffset(years=years)
return prices[prices.index >= cutoff]
def run_one(name: str, strat, prices: pd.DataFrame,
tickers: list[str]) -> dict:
if strat is None:
# SPY buy-and-hold
spy = prices[BENCHMARK].dropna()
eq = (spy / spy.iloc[0]) * 10_000
return {"strategy": name, **{k: v for k, v in pit.summarize(eq, name=name).items()
if k != "name"}}
# MultiFactor needs the benchmark column → pass full `prices`; others only tickers.
if isinstance(strat, MultiFactorStrategy):
strat_prices = prices # keep SPY column
else:
strat_prices = prices[tickers]
eq = pit.backtest(strategy=strat, prices=strat_prices, initial_capital=10_000,
transaction_cost=0.001)
return {"strategy": name, **{k: v for k, v in pit.summarize(eq, name=name).items()
if k != "name"}}
def fmt(row: dict) -> str:
return (f" {row['strategy']:<44s} "
f"CAGR={row['CAGR']*100:>6.1f}% "
f"Sharpe={row['Sharpe']:>5.2f} "
f"Sortino={row['Sortino']:>5.2f} "
f"MaxDD={row['MaxDD']*100:>6.1f}% "
f"Calmar={row['Calmar']:>5.2f}")
def main() -> None:
print("Loading point-in-time price data…")
raw = pit.load_pit_prices()
masked = pit.pit_universe(raw)
# Preserve SPY even though it's not in the membership intervals.
if BENCHMARK in raw.columns:
masked[BENCHMARK] = raw[BENCHMARK]
tickers = [c for c in masked.columns if c != BENCHMARK]
print(f" tickers={len(tickers)} rows={len(masked)} "
f"range={masked.index[0].date()}{masked.index[-1].date()}")
all_results: dict[int, pd.DataFrame] = {}
for years in (10, 5, 3):
sliced = slice_years(masked, years)
strategies = build_strategies(tickers)
print("\n" + "=" * 110)
print(f"Window = last {years} years ({sliced.index[0].date()}{sliced.index[-1].date()})")
print("=" * 110)
rows = []
for name, strat in strategies.items():
try:
rows.append(run_one(name, strat, sliced, tickers))
except Exception as exc: # noqa: BLE001
print(f" [skip] {name}: {type(exc).__name__}: {exc}")
continue
df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False)
for _, r in df.iterrows():
print(fmt(r))
out = os.path.join(DATA_DIR, f"sweep_{years}y.csv")
df.to_csv(out, index=False)
all_results[years] = df
print(f" → saved {out}")
# Cross-window comparison: only strategies present in all windows.
print("\n" + "=" * 110)
print("Cross-window CAGR comparison (sorted by 10y Sharpe)")
print("=" * 110)
pivot = pd.DataFrame({
f"CAGR_{y}y": all_results[y].set_index("strategy")["CAGR"]
for y in (10, 5, 3)
})
sharpe10 = all_results[10].set_index("strategy")["Sharpe"]
pivot["Sharpe_10y"] = sharpe10
pivot = pivot.sort_values("Sharpe_10y", ascending=False)
print(pivot.to_string(formatters={
"CAGR_10y": "{:.1%}".format, "CAGR_5y": "{:.1%}".format,
"CAGR_3y": "{:.1%}".format, "Sharpe_10y": "{:.2f}".format,
}))
if __name__ == "__main__":
main()