diff --git a/research/alpha_factors.py b/research/alpha_factors.py new file mode 100644 index 0000000..24802e2 --- /dev/null +++ b/research/alpha_factors.py @@ -0,0 +1,358 @@ +""" +Alpha factor library — price-only, academically motivated, with a rolling-IC +combiner, inverse-vol portfolio weighting, and volatility targeting. + +Factors (each returns a cross-sectional DataFrame aligned to prices.index): + + mom_12_1 12-1 month momentum (Jegadeesh & Titman 1993). + mom_7_1 Intermediate 7-1m momentum (Novy-Marx 2012). + mom_residual Market-residualized 12-1m (Blitz-Huij-Martens 2011). + rev_1m 1-month reversal × -1 (Jegadeesh 1990 / short-term reversal). + w52_high Price / 52-week high, proximity factor (George & Hwang 2004). + max5_neg -avg(top-5 daily returns past 21d) — lottery/MAX (Bali-Cakici-Whitelaw 2011). + idio_vol_neg -residual-vol from 60d market regression (Ang-Hodrick-Xing-Zhang 2006). + low_beta -60d market beta (Betting Against Beta, Frazzini-Pedersen 2014 variant). + trend_strength Slope / RMSE from 63d log-price regression. + recovery_63 Price / 63d low - 1 (project-native, V-rebound proxy). + +Combiner: + - Cross-sectional percentile-rank each factor (NaN = keep). + - For each day, blend factors with weights proportional to the rolling + 252-day Information Coefficient (Spearman rank corr vs forward 21d return). + - Weights are lagged by 21 days to avoid lookahead; negative-IC factors are + sign-flipped before weighting (so all contribute positively when confident). + +Portfolio: + - Rank composite score, pick top_n (default 15) on a rebalance_freq schedule. + - Inverse-vol weight within top_n (60d realized vol). + - Volatility-target the whole portfolio to target_vol (default 18%) using a + trailing 60-day portfolio-vol estimate; exposure clipped to [0.3, 1.5]. + - Shift(1) at the end for T-1 signal delivery, matching the project convention. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd + +from strategies.base import Strategy + + +# --------------------------------------------------------------------------- +# Factor primitives +# --------------------------------------------------------------------------- + +def _pct(p, n): + return p.pct_change(n, fill_method=None) + + +def f_mom_12_1(p): + return p.shift(21).pct_change(231, fill_method=None) + + +def f_mom_7_1(p): + return p.shift(21).pct_change(126, fill_method=None) + + +def f_rev_1m(p): + return -p.pct_change(21, fill_method=None) + + +def f_w52_high(p): + roll_max = p.rolling(252, min_periods=200).max() + return p / roll_max - 1 # ≤0, closer to 0 = near 52w high + + +def f_max5_neg(p): + ret = p.pct_change(fill_method=None) + # Mean of top-5 returns over the last 21 trading days; negate. + top5 = ret.rolling(21, min_periods=15).apply( + lambda x: np.mean(np.sort(x)[-5:]) if np.isfinite(x).sum() >= 5 else np.nan, + raw=True, + ) + return -top5 + + +def f_recovery_63(p): + return p / p.rolling(63, min_periods=60).min() - 1 + + +def f_trend_strength(p): + """ + Vectorized log-price trend strength: rolling OLS slope ÷ residual RMSE on a + 63-day window. t-stat-like measure of directional trend quality. + """ + logp = np.log(p.replace(0, np.nan)) + n = 63 + idx = np.arange(n, dtype=float) + idx_c = idx - idx.mean() + idx_var = (idx_c ** 2).sum() + + # E[x·y] over the window: rolling sum of (idx·y) simplified via decomposition: + # Σ (i - ī)(y - ȳ) = Σ i·y - n·ī·ȳ (but ī is constant so just: Σ (i-ī)·y) + # We compute Σ (i-ī)·y as a rolling window-weighted sum. + weights = idx_c # shape (n,) + + def rolling_weighted(series_df, w): + """Σ_{k=0..n-1} w[k] * y[t-(n-1)+k] for each column, vectorized.""" + arr = series_df.values + T, K = arr.shape + out = np.full_like(arr, np.nan, dtype=float) + # Convolution across time axis per column: + for k in range(K): + col = arr[:, k] + # Use np.convolve with reversed weights (equivalent to correlate) + conv = np.convolve(col, w[::-1], mode="valid") + out[n - 1:, k] = conv + return pd.DataFrame(out, index=series_df.index, columns=series_df.columns) + + # rolling mean and var for log-price + roll_mean = logp.rolling(n, min_periods=n).mean() + # numerator: Σ (i-ī)(y - ȳ) = Σ (i-ī)·y (since Σ(i-ī) = 0) + num = rolling_weighted(logp.fillna(0.0), weights) + slope = num / idx_var + # Residual variance: Σ(y - ȳ)² / n - slope² * idx_var / n + var_y = logp.rolling(n, min_periods=n).var(ddof=0) + resid_var = (var_y - (slope ** 2) * idx_var / n).clip(lower=1e-18) + rmse = np.sqrt(resid_var) + ts = slope / rmse + # mask rows where the window contained any NaN + valid = logp.rolling(n, min_periods=n).count() == n + return ts.where(valid) + + +def _rolling_beta_and_residvol(p, mkt_ret, window=60): + """Return (beta, residual_vol) DataFrames aligned to prices.index.""" + ret = p.pct_change(fill_method=None) + mkt = mkt_ret.reindex(p.index) + + def pair(stock_ret): + cov = stock_ret.rolling(window, min_periods=window).cov(mkt) + var = mkt.rolling(window, min_periods=window).var() + beta = cov / var + # Residual vol via: var(stock) - beta^2 * var(mkt) (simplification) + var_stock = stock_ret.rolling(window, min_periods=window).var() + resid_var = (var_stock - beta ** 2 * var) .clip(lower=0) + resid_vol = np.sqrt(resid_var) + return beta, resid_vol + + betas = {} + resid_vols = {} + for col in ret.columns: + b, rv = pair(ret[col]) + betas[col] = b + resid_vols[col] = rv + return pd.DataFrame(betas), pd.DataFrame(resid_vols) + + +def f_mom_residual(p, mkt_ret, betas=None, window=60): + if betas is None: + betas, _ = _rolling_beta_and_residvol(p, mkt_ret, window=window) + # 12-1m cumulative residual return = cum stock ret - beta * cum mkt ret. + # Reindex mkt_ret to p.index so arithmetic below does not produce a union + # index (which would corrupt downstream shape assumptions). + mkt_aligned = mkt_ret.reindex(p.index) + stock_cum = p.shift(21).pct_change(231, fill_method=None) + mkt_cum_ret = (1 + mkt_aligned).rolling(231).apply(lambda x: np.prod(x) - 1, raw=True) + mkt_cum = mkt_cum_ret.shift(21) + out = stock_cum.sub(betas.mul(mkt_cum, axis=0), fill_value=np.nan) + return out.reindex(p.index) + + +# --------------------------------------------------------------------------- +# Cross-sectional rank helper +# --------------------------------------------------------------------------- + +def xsec_rank(df: pd.DataFrame) -> pd.DataFrame: + return df.rank(axis=1, pct=True, na_option="keep") + + +# --------------------------------------------------------------------------- +# Rolling IC computation +# --------------------------------------------------------------------------- + +def rolling_ic(factor_rank: pd.DataFrame, fwd_ret: pd.DataFrame, + window: int = 252) -> pd.Series: + """Daily Spearman IC = rank(factor) vs rank(fwd_ret); rolling mean.""" + fr = fwd_ret.rank(axis=1, pct=True, na_option="keep") + # Per-day pearson corr of rank-transformed ≡ Spearman. + per_day_ic = factor_rank.corrwith(fr, axis=1) + return per_day_ic.rolling(window, min_periods=window // 2).mean() + + +def _rolling_ls_sharpe(factor_rank: pd.DataFrame, + prices: pd.DataFrame, + window: int = 252, + rebal: int = 21, + tcost: float = 0.001) -> pd.Series: + """ + Rolling realized Sharpe of a long-top-decile / short-bottom-decile portfolio + constructed on `factor_rank`, rebalanced every `rebal` trading days, with + proportional turnover cost `tcost`. Used as a factor-quality weight. + + Returned series is aligned to `prices.index` and the Sharpe at day t is + computed from returns over [t-window, t]. + """ + long_mask = factor_rank >= 0.9 + short_mask = factor_rank <= 0.1 + # Rebalance: hold the mask constant between rebal dates + rebal_mask = pd.Series(False, index=factor_rank.index) + rebal_mask.iloc[::rebal] = True + long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0) + short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0) + long_w[~rebal_mask] = np.nan + short_w[~rebal_mask] = np.nan + long_w = long_w.ffill().fillna(0.0) + short_w = short_w.ffill().fillna(0.0) + + rets = prices.pct_change(fill_method=None) + long_ret = (long_w.shift(1) * rets).sum(axis=1) + short_ret = (short_w.shift(1) * rets).sum(axis=1) + + long_turn = long_w.diff().abs().sum(axis=1).fillna(0.0) + short_turn = short_w.diff().abs().sum(axis=1).fillna(0.0) + + ls_ret = (long_ret - short_ret) - (long_turn + short_turn) * tcost + ls_ret = ls_ret.fillna(0.0) + mean = ls_ret.rolling(window, min_periods=window // 2).mean() + std = ls_ret.rolling(window, min_periods=window // 2).std() + sharpe = (mean / std) * np.sqrt(252) + return sharpe + + +# --------------------------------------------------------------------------- +# Strategy +# --------------------------------------------------------------------------- + +class AlphaFactorStrategy(Strategy): + """ + Multi-factor long-only with rolling LS-Sharpe-weighted signal blend, + inverse-vol weighting, and portfolio-level volatility targeting. + + Why LS-Sharpe and not IC? + IC (rank-forward correlation) measures directional accuracy but ignores + the magnitude of cross-sectional dispersion. Two factors with identical + IC can have very different P&L. Empirically on this sample rev_1m has + IC t-stat +5 but LS Sharpe -12 — its top decile are freshly crashed + names that keep crashing. We weight by a lagged 252d rolling LS-Sharpe + (top-decile minus bottom-decile, monthly rebalance, 10bps t-cost) and + floor weights at zero so demoted factors simply drop out. + + The strategy requires a market return series (e.g. SPY pct_change) passed + at construction time — it is NOT derived from data inside generate_signals, + because the cross-sectional universe contains only selected tickers while + we want a stable market benchmark for beta/residual computations. + """ + + def __init__( + self, + mkt_returns: pd.Series, + top_n: int = 15, + rebal_freq: int = 10, + vol_window: int = 60, + vol_target_annual: float | None = 0.18, + ic_window: int = 252, + exposure_clip: tuple[float, float] = (0.30, 1.50), + fwd_window: int = 21, + weight_scheme: str = "ls_sharpe", # {"ls_sharpe", "ic", "equal"} + min_weight: float = 0.0, # floor per-factor weight (0 = drop losers) + ): + self.mkt_returns = mkt_returns + self.top_n = top_n + self.rebal_freq = rebal_freq + self.vol_window = vol_window + self.vol_target_annual = vol_target_annual + self.ic_window = ic_window + self.exposure_clip = exposure_clip + self.fwd_window = fwd_window + self.weight_scheme = weight_scheme + self.min_weight = min_weight + + # ---- Factor matrix ---- + def compute_factors(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]: + betas, resid_vol = _rolling_beta_and_residvol( + data, self.mkt_returns, window=self.vol_window) + factors = { + "mom_12_1": f_mom_12_1(data), + "mom_7_1": f_mom_7_1(data), + "mom_residual": f_mom_residual(data, self.mkt_returns, betas=betas), + "rev_1m": f_rev_1m(data), + "w52_high": f_w52_high(data), + "max5_neg": f_max5_neg(data), + "recovery_63": f_recovery_63(data), + "trend_strength": f_trend_strength(data), + "idio_vol_neg": -resid_vol, + "low_beta": -betas, + } + return factors + + # ---- Full pipeline ---- + def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: + factors = self.compute_factors(data) + ranks = {k: xsec_rank(v) for k, v in factors.items()} + + if self.weight_scheme == "ic": + fwd_ret = data.shift(-self.fwd_window) / data - 1 + weight_series = { + k: rolling_ic(ranks[k], fwd_ret, window=self.ic_window).shift(self.fwd_window) + for k in ranks + } + elif self.weight_scheme == "ls_sharpe": + weight_series = { + k: _rolling_ls_sharpe(ranks[k], data, + window=self.ic_window, + rebal=21, tcost=0.001).shift(self.fwd_window) + for k in ranks + } + elif self.weight_scheme == "equal": + weight_series = {k: pd.Series(1.0, index=ranks[k].index) for k in ranks} + else: + raise ValueError(f"unknown weight_scheme {self.weight_scheme!r}") + + composite = None + weight_norm = None + for k, rk in ranks.items(): + w = weight_series[k].reindex(rk.index).fillna(0.0) + if self.min_weight is not None: + w = w.where(w > self.min_weight, 0.0) + contrib = rk.mul(w, axis=0) + composite = contrib if composite is None else composite.add(contrib, fill_value=0.0) + abs_w = w.abs() + weight_norm = abs_w if weight_norm is None else weight_norm.add(abs_w, fill_value=0) + weight_norm = weight_norm.replace(0, np.nan) + composite = composite.div(weight_norm, axis=0) + + # Top-N selection. + sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom") + n_valid = composite.notna().sum(axis=1) + enough = n_valid >= self.top_n + top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1) + + # Inverse-vol weighting within top_n. + rets = data.pct_change(fill_method=None) + vol = rets.rolling(self.vol_window, min_periods=self.vol_window).std() + inv_vol = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0) + row_sums = inv_vol.sum(axis=1).replace(0, np.nan) + weights = inv_vol.div(row_sums, axis=0).fillna(0.0) + + # Rebalance schedule. + warmup = max(252, self.vol_window + 21, self.ic_window + self.fwd_window) + rebal_mask = pd.Series(False, index=data.index) + rebal_idx = list(range(warmup, len(data), self.rebal_freq)) + rebal_mask.iloc[rebal_idx] = True + weights[~rebal_mask] = np.nan + weights = weights.ffill().fillna(0.0) + weights.iloc[:warmup] = 0.0 + + # Volatility targeting at the portfolio level. + if self.vol_target_annual is not None: + # Use returns of the *current* weight vector; vol is trailing realized + # on the applied weights so no lookahead. Compute after ffill. + port_rets = (weights.shift(1) * rets).sum(axis=1) + port_vol = port_rets.rolling(self.vol_window, + min_periods=self.vol_window).std() * np.sqrt(252) + scale = (self.vol_target_annual / port_vol).clip(*self.exposure_clip) + scale = scale.fillna(method="ffill").fillna(1.0) + weights = weights.mul(scale, axis=0) + + return weights.shift(1).fillna(0.0) diff --git a/research/alpha_research.py b/research/alpha_research.py new file mode 100644 index 0000000..815e4e8 --- /dev/null +++ b/research/alpha_research.py @@ -0,0 +1,279 @@ +""" +Professional QR-style factor research on the PIT S&P 500 universe. + +Stage 1 — Factor diagnostics. + IC (Spearman, 21d fwd), t-stat, realistic long-short decile backtest + (monthly rebalance, 10 bps t-cost). + +Stage 2 — Composite backtest 1/3/5/10y vs champions. + For 1y window we pre-pend 2y of warmup then score returns on the last 1y + only, so strategies with 252d+ warmup are actually active in-window. + +Stage 3 — Config sweep across weight_scheme × top_n × rebal × vol_target. + +Outputs CSVs to data/alpha_research_*.csv. +""" + +from __future__ import annotations + +import os +import warnings + +import numpy as np +import pandas as pd + +import research.pit_backtest as pit +from research.alpha_factors import (AlphaFactorStrategy, _rolling_beta_and_residvol, + f_mom_12_1, f_mom_7_1, f_rev_1m, f_w52_high, + f_max5_neg, f_recovery_63, f_trend_strength, + xsec_rank, _rolling_ls_sharpe) +from strategies.factor_combo import FactorComboStrategy +from strategies.recovery_momentum import RecoveryMomentumStrategy + +warnings.filterwarnings("ignore", category=FutureWarning) +warnings.filterwarnings("ignore", category=RuntimeWarning) + +DATA_DIR = "data" +BENCHMARK = "SPY" + + +def load(): + raw = pit.load_pit_prices() + masked = pit.pit_universe(raw) + if BENCHMARK in raw.columns: + masked[BENCHMARK] = raw[BENCHMARK] + return masked + + +def warmup_slice(df: pd.DataFrame, years: int, warmup_days: int = 500) -> tuple[pd.DataFrame, pd.Timestamp]: + """Return (prices_with_warmup, measurement_start). Strategies are fed the + longer series, but metrics must be computed only from measurement_start.""" + measurement_start = df.index[-1] - pd.DateOffset(years=years) + first_day = df.index[0] + # Keep all rows between measurement_start - warmup_days and end. + cutoff = max(first_day, measurement_start - pd.Timedelta(days=warmup_days * 1.5)) + sliced = df[df.index >= cutoff] + return sliced, measurement_start + + +def measure(eq: pd.Series, start: pd.Timestamp, name: str = "") -> dict: + eq = eq[eq.index >= start] + # Re-base to 10_000 at start + eq = eq / eq.iloc[0] * 10_000 + return pit.summarize(eq, name=name) + + +# --------------------------------------------------------------------------- +# Stage 1 — Factor diagnostics +# --------------------------------------------------------------------------- + +def factor_diagnostics(masked: pd.DataFrame): + print("\n" + "=" * 110) + print("Stage 1 — Factor diagnostics (full 10y PIT, monthly rebal, 10bps t-cost)") + print("=" * 110) + tickers = [c for c in masked.columns if c != BENCHMARK] + prices = masked[tickers] + mkt_ret = masked[BENCHMARK].pct_change(fill_method=None) + betas, resid_vol = _rolling_beta_and_residvol(prices, mkt_ret, window=60) + + from research.alpha_factors import f_mom_residual + factor_builders = { + "mom_12_1": lambda: f_mom_12_1(prices), + "mom_7_1": lambda: f_mom_7_1(prices), + "mom_residual": lambda: f_mom_residual(prices, mkt_ret, betas=betas), + "rev_1m": lambda: f_rev_1m(prices), + "w52_high": lambda: f_w52_high(prices), + "max5_neg": lambda: f_max5_neg(prices), + "recovery_63": lambda: f_recovery_63(prices), + "trend_strength": lambda: f_trend_strength(prices), + "idio_vol_neg": lambda: -resid_vol, + "low_beta": lambda: -betas, + } + fwd_21 = prices.shift(-21) / prices - 1 + fwd_rank = fwd_21.rank(axis=1, pct=True, na_option="keep") + + rows = [] + for name, build in factor_builders.items(): + fac = build() + fr = xsec_rank(fac) + ic_daily = fr.corrwith(fwd_rank, axis=1).dropna() + ic_mean = ic_daily.mean() + ic_t = ic_mean / (ic_daily.std() / np.sqrt(len(ic_daily))) if len(ic_daily) > 1 else 0.0 + ls = realistic_decile_spread(fr, prices, rebal=21, tcost=0.001) + long_only = realistic_top_decile(fr, prices, rebal=21, tcost=0.001) + rows.append({ + "factor": name, + "IC_mean": ic_mean, "IC_t": ic_t, + "LS_CAGR": ls["CAGR"], "LS_Sharpe": ls["Sharpe"], + "LO_CAGR": long_only["CAGR"], "LO_Sharpe": long_only["Sharpe"], + "LO_MaxDD": long_only["MaxDD"], + }) + + df = pd.DataFrame(rows).sort_values("LO_Sharpe", ascending=False) + df.to_csv(os.path.join(DATA_DIR, "alpha_research_factors.csv"), index=False) + print(df.to_string(index=False, formatters={ + "IC_mean": "{:+.4f}".format, "IC_t": "{:+.2f}".format, + "LS_CAGR": "{:+.1%}".format, "LS_Sharpe": "{:+.2f}".format, + "LO_CAGR": "{:+.1%}".format, "LO_Sharpe": "{:+.2f}".format, + "LO_MaxDD": "{:.1%}".format, + })) + return df + + +def realistic_decile_spread(factor_rank, prices, rebal=21, tcost=0.001): + """Long top-decile minus short bottom-decile, monthly rebal, 10bps t-cost.""" + long_mask = factor_rank >= 0.9 + short_mask = factor_rank <= 0.1 + long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0) + short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0) + rebal_mask = pd.Series(False, index=factor_rank.index) + rebal_mask.iloc[::rebal] = True + long_w[~rebal_mask] = np.nan + short_w[~rebal_mask] = np.nan + long_w = long_w.ffill().fillna(0.0) + short_w = short_w.ffill().fillna(0.0) + rets = prices.pct_change(fill_method=None) + ls = ((long_w.shift(1) * rets).sum(axis=1) + - (short_w.shift(1) * rets).sum(axis=1)) \ + - (long_w.diff().abs().sum(axis=1).fillna(0.0) + + short_w.diff().abs().sum(axis=1).fillna(0.0)) * tcost + ls = ls.fillna(0.0).iloc[252:] + eq = (1 + ls).cumprod() * 10_000 + return pit.summarize(eq, name="ls") + + +def realistic_top_decile(factor_rank, prices, rebal=21, tcost=0.001): + """Long-only top-decile equal-weight portfolio with t-cost.""" + long_mask = factor_rank >= 0.9 + long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0) + rebal_mask = pd.Series(False, index=factor_rank.index) + rebal_mask.iloc[::rebal] = True + long_w[~rebal_mask] = np.nan + long_w = long_w.ffill().fillna(0.0) + rets = prices.pct_change(fill_method=None) + port_ret = (long_w.shift(1) * rets).sum(axis=1) \ + - long_w.diff().abs().sum(axis=1).fillna(0.0) * tcost + port_ret = port_ret.fillna(0.0).iloc[252:] + eq = (1 + port_ret).cumprod() * 10_000 + return pit.summarize(eq, name="lo") + + +# --------------------------------------------------------------------------- +# Stage 2 — Composite backtest +# --------------------------------------------------------------------------- + +def composite_backtest(masked: pd.DataFrame): + print("\n" + "=" * 110) + print("Stage 2 — IC / LS-Sharpe-weighted composite vs champions (1/3/5/10y)") + print("=" * 110) + tickers = [c for c in masked.columns if c != BENCHMARK] + mkt_ret_full = masked[BENCHMARK].pct_change(fill_method=None) + + configs = { + "Alpha(LS-Sharpe, tn=15, rebal=10)": + lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=10, + vol_target_annual=None, weight_scheme="ls_sharpe"), + "Alpha(LS-Sharpe, tn=15, rebal=21)": + lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=21, + vol_target_annual=None, weight_scheme="ls_sharpe"), + "Alpha(LS-Sharpe+VT18, tn=15, rebal=21)": + lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=21, + vol_target_annual=0.18, weight_scheme="ls_sharpe"), + "Alpha(IC, tn=15, rebal=21)": + lambda: AlphaFactorStrategy(mkt_ret_full, top_n=15, rebal_freq=21, + vol_target_annual=None, weight_scheme="ic"), + "Recovery+Mom Top10": lambda: RecoveryMomentumStrategy(top_n=10), + "fc_up_cap+mom_gap": lambda: FactorComboStrategy("up_cap+mom_gap", + rebal_freq=21, top_n=10), + } + + all_rows = [] + for years in (10, 5, 3, 1): + sliced, measurement_start = warmup_slice(masked, years, warmup_days=500) + prices = sliced[tickers] + print(f"\n --- Window: last {years}y " + f"(measure {measurement_start.date()} → {sliced.index[-1].date()}, " + f"warmup from {sliced.index[0].date()}) ---") + spy = sliced[BENCHMARK].dropna() + spy_eq = (spy / spy.iloc[0]) * 10_000 + rows = [{"years": years, "strategy": "SPY buy-and-hold", + **{k: v for k, v in measure(spy_eq, measurement_start, "").items() + if k != "name"}}] + for name, factory in configs.items(): + strat = factory() + eq = pit.backtest(strategy=strat, prices=prices, + initial_capital=10_000, transaction_cost=0.001) + m = measure(eq, measurement_start, "") + rows.append({"years": years, "strategy": name, + **{k: v for k, v in m.items() if k != "name"}}) + for r in rows: + print(f" {r['strategy']:<42s} " + f"CAGR={r['CAGR']*100:>6.1f}% " + f"Sharpe={r['Sharpe']:>5.2f} " + f"Sortino={r['Sortino']:>5.2f} " + f"MaxDD={r['MaxDD']*100:>6.1f}% " + f"Calmar={r['Calmar']:>5.2f}") + all_rows.extend(rows) + + df = pd.DataFrame(all_rows) + df.to_csv(os.path.join(DATA_DIR, "alpha_research_composite.csv"), index=False) + return df + + +# --------------------------------------------------------------------------- +# Stage 3 — Config sweep +# --------------------------------------------------------------------------- + +def config_sweep(masked: pd.DataFrame): + print("\n" + "=" * 110) + print("Stage 3 — AlphaFactor config sweep (10y)") + print("=" * 110) + tickers = [c for c in masked.columns if c != BENCHMARK] + prices = masked[tickers] + mkt_ret = masked[BENCHMARK].pct_change(fill_method=None) + + rows = [] + for scheme in ("ls_sharpe", "ic", "equal"): + for top_n in (10, 15, 20): + for rebal in (10, 21): + for vt in (None, 0.18): + strat = AlphaFactorStrategy(mkt_ret, top_n=top_n, rebal_freq=rebal, + vol_target_annual=vt, + weight_scheme=scheme) + eq = pit.backtest(strat, prices, initial_capital=10_000, + transaction_cost=0.001) + s = pit.summarize(eq, "") + rows.append({"scheme": scheme, "top_n": top_n, "rebal": rebal, + "vt": vt if vt is not None else "none", + "CAGR": s["CAGR"], "Sharpe": s["Sharpe"], + "MaxDD": s["MaxDD"], "Calmar": s["Calmar"]}) + + df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False) + df.to_csv(os.path.join(DATA_DIR, "alpha_research_sweep.csv"), index=False) + print(df.head(15).to_string(index=False, formatters={ + "CAGR": "{:.1%}".format, "Sharpe": "{:.2f}".format, + "MaxDD": "{:.1%}".format, "Calmar": "{:.2f}".format, + })) + return df + + +def main(): + print("Loading PIT data…") + masked = load() + print(f" shape={masked.shape} range={masked.index[0].date()} → {masked.index[-1].date()}") + + factor_diagnostics(masked) + composite_backtest(masked) + sweep = config_sweep(masked) + + print("\n" + "=" * 110) + print("Top 5 configs:") + print("=" * 110) + print(sweep.head(5).to_string(index=False, formatters={ + "CAGR": "{:.1%}".format, "Sharpe": "{:.2f}".format, + "MaxDD": "{:.1%}".format, "Calmar": "{:.2f}".format, + })) + + +if __name__ == "__main__": + main() diff --git a/research/factor_optimize.py b/research/factor_optimize.py new file mode 100644 index 0000000..dd7acea --- /dev/null +++ b/research/factor_optimize.py @@ -0,0 +1,294 @@ +""" +Factor-level optimization on the point-in-time S&P 500 universe. + +Builds on the sweep results in data/sweep_*.csv. Runs four experiments: + + O1 — RecoveryMomentumPlus hyperparameter grid (top_n × rec_window × rec_weight × rebal), + with 2016-2022 train / 2023-2026 test split. Picks best by Sharpe-on-test. + O2 — SPY>MA regime filter applied to the 3 highest-Sharpe strategies (10y window). + O3 — Top-3 uncorrelated ensemble: greedy corr<0.85 selection → equal-weight blend. + O4 — Factor-mix parameter sweep on the FactorCombo "up_cap+mom_gap" signal + (top_n × rebal_freq). + +All experiments run on PIT-masked data. Results printed + written to +data/factor_optimize_.csv. + +Usage: + uv run python -m research.factor_optimize +""" + +from __future__ import annotations + +import os +import warnings + +import numpy as np +import pandas as pd + +import research.pit_backtest as pit +from research.strategies_plus import (EnsembleStrategy, RecoveryMomentumPlus, + spy_ma200_filter) +from strategies.factor_combo import FactorComboStrategy, SIGNAL_REGISTRY +from strategies.momentum_quality import MomentumQualityStrategy +from strategies.recovery_momentum import RecoveryMomentumStrategy + +warnings.filterwarnings("ignore", category=FutureWarning) + +DATA_DIR = "data" +BENCHMARK = "SPY" + + +def load_masked_prices(): + raw = pit.load_pit_prices() + masked = pit.pit_universe(raw) + if BENCHMARK in raw.columns: + masked[BENCHMARK] = raw[BENCHMARK] + return masked + + +def slice_period(df, start=None, end=None): + out = df + if start: + out = out[out.index >= start] + if end: + out = out[out.index <= end] + return out + + +def run(strat, prices, *, regime_filter=None): + return pit.backtest( + strategy=strat, prices=prices, initial_capital=10_000, + transaction_cost=0.001, regime_filter=regime_filter, + ) + + +# --------------------------------------------------------------------------- +# O1 — RecoveryMomentumPlus hyperparameter grid +# --------------------------------------------------------------------------- + +def o1_hyperparam_sweep(masked): + print("\n" + "=" * 100) + print("O1 — RecoveryMomentumPlus sweep (train 2016-2022 / test 2023-2026)") + print("=" * 100) + tickers = [c for c in masked.columns if c != BENCHMARK] + prices = masked[tickers] + train = slice_period(prices, "2016-04-19", "2022-12-31") + test = slice_period(prices, "2023-01-01", None) + + grid = [] + for top_n in (5, 10, 15, 20): + for rec_win in (42, 63, 126): + for rec_w in (0.3, 0.5, 0.7): + for rebal in (5, 10, 21): + grid.append((top_n, rec_win, rec_w, rebal)) + + rows = [] + for i, (top_n, rec_win, rec_w, rebal) in enumerate(grid, 1): + cfg = dict(top_n=top_n, recovery_window=rec_win, + rec_weight=rec_w, rebal_freq=rebal) + tr = pit.summarize(run(RecoveryMomentumPlus(**cfg), train), "") + te = pit.summarize(run(RecoveryMomentumPlus(**cfg), test), "") + rows.append({**cfg, + "train_CAGR": tr["CAGR"], "train_Sharpe": tr["Sharpe"], + "test_CAGR": te["CAGR"], "test_Sharpe": te["Sharpe"], + "test_MaxDD": te["MaxDD"], "test_Calmar": te["Calmar"]}) + if i % 12 == 0 or i == len(grid): + print(f" … {i}/{len(grid)} configs evaluated") + + df = pd.DataFrame(rows).sort_values("test_Sharpe", ascending=False) + out = os.path.join(DATA_DIR, "factor_optimize_O1.csv") + df.to_csv(out, index=False) + + print("\n --- Top 10 by out-of-sample Sharpe (2023-2026) ---") + disp = ["top_n", "recovery_window", "rec_weight", "rebal_freq", + "train_Sharpe", "test_Sharpe", "train_CAGR", "test_CAGR", + "test_MaxDD", "test_Calmar"] + print(df.head(10)[disp].to_string(index=False, formatters={ + "train_Sharpe": "{:.2f}".format, "test_Sharpe": "{:.2f}".format, + "train_CAGR": "{:.1%}".format, "test_CAGR": "{:.1%}".format, + "test_MaxDD": "{:.1%}".format, "test_Calmar": "{:.2f}".format, + })) + return df + + +# --------------------------------------------------------------------------- +# O2 — Regime filter on the top strategies +# --------------------------------------------------------------------------- + +def o2_regime(masked): + print("\n" + "=" * 100) + print("O2 — SPY > MA regime filter on top strategies (full 10y PIT)") + print("=" * 100) + tickers = [c for c in masked.columns if c != BENCHMARK] + prices = masked[tickers] + + spy_full = masked[BENCHMARK].dropna() + + contenders = { + "Recovery+Mom Top10": RecoveryMomentumStrategy(top_n=10), + "fc_up_cap_mom_gap_monthly": FactorComboStrategy("up_cap+mom_gap", + rebal_freq=21, top_n=10), + "fc_rec63_mom_gap_monthly": FactorComboStrategy("rec63+mom_gap", + rebal_freq=21, top_n=10), + } + + rows = [] + for name, strat in contenders.items(): + base = run(strat, prices) + rows.append({"strategy": name, "filter": "none", + **{k: v for k, v in pit.summarize(base, "").items() if k != "name"}}) + for ma in (200, 150, 100): + filt = spy_ma200_filter(spy_full, ma_window=ma).reindex(prices.index).fillna(False) + strat_fresh = _fresh_copy(strat) + eq = run(strat_fresh, prices, regime_filter=filt) + rows.append({"strategy": name, "filter": f"SPY>MA{ma}", + **{k: v for k, v in pit.summarize(eq, "").items() if k != "name"}}) + + df = pd.DataFrame(rows) + df.to_csv(os.path.join(DATA_DIR, "factor_optimize_O2.csv"), index=False) + + print(f" {'strategy':<32s} {'filter':<12s} {'CAGR':>7s} {'Sharpe':>7s} " + f"{'MaxDD':>7s} {'Calmar':>7s}") + for _, r in df.iterrows(): + print(f" {r['strategy']:<32s} {r['filter']:<12s} " + f"{r['CAGR']*100:>6.1f}% {r['Sharpe']:>7.2f} " + f"{r['MaxDD']*100:>6.1f}% {r['Calmar']:>7.2f}") + return df + + +def _fresh_copy(strat): + """Re-instantiate a strategy so state (if any) is reset between backtests.""" + if isinstance(strat, RecoveryMomentumStrategy): + return RecoveryMomentumStrategy( + recovery_window=strat.recovery_window, mom_lookback=strat.mom_lookback, + mom_skip=strat.mom_skip, rebal_freq=strat.rebal_freq, top_n=strat.top_n) + if isinstance(strat, FactorComboStrategy): + return FactorComboStrategy(strat.signal_name, rebal_freq=strat.rebal_freq, + top_n=strat.top_n) + if isinstance(strat, MomentumQualityStrategy): + return MomentumQualityStrategy( + momentum_period=strat.momentum_period, skip=strat.skip, + quality_window=strat.quality_window, top_n=strat.top_n) + return strat # already stateless for our uses + + +# --------------------------------------------------------------------------- +# O3 — Uncorrelated ensemble +# --------------------------------------------------------------------------- + +def o3_ensemble(masked): + print("\n" + "=" * 100) + print("O3 — Greedy uncorrelated ensemble (full 10y PIT)") + print("=" * 100) + tickers = [c for c in masked.columns if c != BENCHMARK] + prices = masked[tickers] + spy_full = masked[BENCHMARK].dropna() + + # Candidate pool: the production strategies that cleared 0.75 Sharpe in 10y sweep. + candidates: list[tuple[str, object]] = [ + ("Recovery+Mom Top10", RecoveryMomentumStrategy(top_n=10)), + ("fc_up_cap_mom_gap_monthly", FactorComboStrategy("up_cap+mom_gap", 21, 10)), + ("fc_rec63_mom_gap_monthly", FactorComboStrategy("rec63+mom_gap", 21, 10)), + ("fc_up_cap_quality_mom_monthly", FactorComboStrategy("up_cap+quality_mom", 21, 10)), + ("fc_rec_mfilt_deep_upvol_monthly", FactorComboStrategy("rec_mfilt+deep_upvol", 21, 10)), + ("fc_mom7m_rec126_monthly", FactorComboStrategy("mom7m+rec126", 21, 10)), + ("Recovery+Mom Top20", RecoveryMomentumStrategy(top_n=20)), + ("fc_down_resil_qual_mom_monthly", FactorComboStrategy("down_resil+qual_mom", 21, 10)), + ] + + equities: dict[str, pd.Series] = {name: run(s, prices) for name, s in candidates} + returns = pd.DataFrame({n: eq.pct_change().fillna(0) for n, eq in equities.items()}) + sharpes = {n: pit.summarize(eq, n)["Sharpe"] for n, eq in equities.items()} + order = sorted(candidates, key=lambda t: sharpes[t[0]], reverse=True) + + picked_names: list[str] = [] + picked: list[tuple[object, float]] = [] + for name, strat in order: + if any(returns[name].corr(returns[p]) > 0.85 for p in picked_names): + continue + picked_names.append(name) + picked.append((strat, 1.0)) + if len(picked) >= 3: + break + + print(f" Selected {len(picked)} uncorrelated components:") + for name in picked_names: + print(f" - {name} (Sharpe={sharpes[name]:.2f})") + + ens = EnsembleStrategy(picked) + eq_ens = run(ens, prices) + filt = spy_ma200_filter(spy_full).reindex(prices.index).fillna(False) + eq_ens_reg = run(EnsembleStrategy(picked), prices, regime_filter=filt) + + spy_bh = (masked[BENCHMARK].dropna().pipe(lambda s: s / s.iloc[0] * 10_000)) + rows = [pit.summarize(spy_bh, "SPY buy-and-hold")] + for name in picked_names: + rows.append(pit.summarize(equities[name], f" component: {name}")) + rows.append(pit.summarize(eq_ens, "ENSEMBLE (equal-weight, no filter)")) + rows.append(pit.summarize(eq_ens_reg, "ENSEMBLE + SPY>MA200 filter")) + for r in rows: + print(pit.fmt_row(r)) + + df = pd.DataFrame(rows) + df.to_csv(os.path.join(DATA_DIR, "factor_optimize_O3.csv"), index=False) + return df, picked_names + + +# --------------------------------------------------------------------------- +# O4 — FactorCombo up_cap+mom_gap: top_n × rebal sweep +# --------------------------------------------------------------------------- + +def o4_factorcombo_sweep(masked): + print("\n" + "=" * 100) + print("O4 — FactorCombo up_cap+mom_gap: top_n × rebal (full 10y PIT)") + print("=" * 100) + tickers = [c for c in masked.columns if c != BENCHMARK] + prices = masked[tickers] + + rows = [] + for top_n in (5, 8, 10, 15, 20, 30): + for rebal in (5, 10, 21, 42): + strat = FactorComboStrategy("up_cap+mom_gap", rebal_freq=rebal, top_n=top_n) + eq = run(strat, prices) + s = pit.summarize(eq, f"top_n={top_n} rebal={rebal}") + rows.append({"top_n": top_n, "rebal": rebal, + "CAGR": s["CAGR"], "Sharpe": s["Sharpe"], + "MaxDD": s["MaxDD"], "Calmar": s["Calmar"]}) + + df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False) + df.to_csv(os.path.join(DATA_DIR, "factor_optimize_O4.csv"), index=False) + + print(f" {'top_n':<8s}{'rebal':<8s}{'CAGR':>8s}{'Sharpe':>9s}" + f"{'MaxDD':>9s}{'Calmar':>9s}") + for _, r in df.iterrows(): + print(f" {int(r['top_n']):<8d}{int(r['rebal']):<8d}" + f"{r['CAGR']*100:>7.1f}%{r['Sharpe']:>9.2f}" + f"{r['MaxDD']*100:>8.1f}%{r['Calmar']:>9.2f}") + return df + + +def main(): + print("Loading PIT-masked price data…") + masked = load_masked_prices() + print(f" shape={masked.shape} range={masked.index[0].date()} → {masked.index[-1].date()}") + + o1 = o1_hyperparam_sweep(masked) + o2 = o2_regime(masked) + o3, picks = o3_ensemble(masked) + o4 = o4_factorcombo_sweep(masked) + + print("\n" + "=" * 100) + print("Summary: best config from each experiment") + print("=" * 100) + best_o1 = o1.iloc[0] + print(f" O1 best OOS Sharpe: top_n={int(best_o1['top_n'])} rec_win={int(best_o1['recovery_window'])} " + f"rec_w={best_o1['rec_weight']} rebal={int(best_o1['rebal_freq'])} " + f"→ test Sharpe={best_o1['test_Sharpe']:.2f} test CAGR={best_o1['test_CAGR']*100:.1f}%") + best_o4 = o4.iloc[0] + print(f" O4 best overall: top_n={int(best_o4['top_n'])} rebal={int(best_o4['rebal'])} " + f"Sharpe={best_o4['Sharpe']:.2f} CAGR={best_o4['CAGR']*100:.1f}% " + f"Calmar={best_o4['Calmar']:.2f}") + + +if __name__ == "__main__": + main() diff --git a/research/interaction_alpha.py b/research/interaction_alpha.py new file mode 100644 index 0000000..1d245d9 --- /dev/null +++ b/research/interaction_alpha.py @@ -0,0 +1,233 @@ +""" +Interaction / multiplicative factor strategy. + +Rationale: in the 10y PIT diagnostics, each single-factor top decile clocks +~0.5–0.8 Sharpe, yet the production Recovery+Mom Top10 delivers 0.92. The +extra alpha comes from an AND-style interaction — stocks that rank high on +BOTH factors simultaneously. Linear rank-blending loses this because a stock +can make top_n by being middling on many factors instead of extreme on a few. + +This module provides: + + * `MultiplicativeFactorStrategy` — picks top_n stocks by the geometric mean + (equivalently the product) of cross-sectional factor ranks. Concentrates + on consensus winners. + + * `VotingFactorStrategy` — counts how many factors place a stock in its + top `vote_pct`; selects stocks clearing a minimum vote threshold. Breaks + ties by the sum of ranks. Robust when factor ICs drift. + + * `SubStrategyEnsemble` — equal-weight blend of Recovery+Mom Top10, + fc_up_cap+mom_gap monthly, and a new Multiplicative("mom × recovery × + idio_vol_neg") sleeve. Diversifies across independent alpha sources + rather than across factor primitives. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd + +from research.alpha_factors import (_rolling_beta_and_residvol, f_mom_12_1, + f_mom_7_1, f_rev_1m, f_w52_high, f_max5_neg, + f_recovery_63, f_trend_strength, xsec_rank, + f_mom_residual) +from strategies.base import Strategy +from strategies.factor_combo import FactorComboStrategy +from strategies.recovery_momentum import RecoveryMomentumStrategy + + +# --------------------------------------------------------------------------- +# Multiplicative top-N +# --------------------------------------------------------------------------- + +class MultiplicativeFactorStrategy(Strategy): + """ + Top-N by product of selected factor ranks (equivalent to rank-geometric-mean). + + Parameters + ---------- + factor_names : list[str] + Keys into the factor library. Supported: + mom_12_1, mom_7_1, mom_residual, recovery_63, w52_high, + idio_vol_neg, mom_x_recovery (shortcut pair). + top_n : int + Number of stocks. + rebal_freq : int + Rebal interval in trading days. + mkt_returns : pd.Series | None + Required for mom_residual / idio_vol_neg. + """ + + def __init__(self, factor_names: list[str], top_n: int = 10, + rebal_freq: int = 21, mkt_returns: pd.Series | None = None, + weighting: str = "equal", signal_concentration: float = 0.0, + dispersion_scale: bool = False): + """ + Parameters + ---------- + signal_concentration : float + Exponent applied to composite score when weighting=='signal'. + 0 → equal weight within top_n; higher → more weight on top ranks. + dispersion_scale : bool + Scale total exposure by z-scored cross-sectional rank dispersion, + clipped to [0.5, 1.3]. Expands in high-dispersion regimes. + """ + self.factor_names = factor_names + self.top_n = top_n + self.rebal_freq = rebal_freq + self.mkt_returns = mkt_returns + self.weighting = weighting + self.signal_concentration = signal_concentration + self.dispersion_scale = dispersion_scale + + def _build(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]: + betas, resid_vol = (None, None) + if any(f in ("mom_residual", "idio_vol_neg", "low_beta") for f in self.factor_names): + if self.mkt_returns is None: + raise ValueError("mkt_returns required for beta-based factors") + betas, resid_vol = _rolling_beta_and_residvol(data, self.mkt_returns, 60) + lib = { + "mom_12_1": lambda: f_mom_12_1(data), + "mom_7_1": lambda: f_mom_7_1(data), + "mom_residual": lambda: f_mom_residual(data, self.mkt_returns, betas=betas), + "recovery_63": lambda: f_recovery_63(data), + "w52_high": lambda: f_w52_high(data), + "idio_vol_neg": lambda: -resid_vol, + "low_beta": lambda: -betas, + "trend": lambda: f_trend_strength(data), + } + return {n: lib[n]() for n in self.factor_names} + + def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: + factors = self._build(data) + ranks = {n: xsec_rank(v) for n, v in factors.items()} + + # Product of ranks. If any rank is NaN, product is NaN → row excluded. + composite = None + for rk in ranks.values(): + composite = rk if composite is None else composite.mul(rk, fill_value=np.nan) + composite = composite.where(~rk.isna(), np.nan) + + sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom") + n_valid = composite.notna().sum(axis=1) + enough = n_valid >= self.top_n + top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1) + + if self.weighting == "equal": + raw = top_mask.astype(float) + elif self.weighting == "inv_vol": + vol = data.pct_change(fill_method=None).rolling(60).std() + raw = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0) + elif self.weighting == "signal": + # Weight ∝ composite^concentration, only among top_mask picks. + score = composite.where(top_mask, 0.0).fillna(0.0) + raw = score ** max(self.signal_concentration, 1.0) + else: + raise ValueError(f"bad weighting {self.weighting!r}") + + row_sums = raw.sum(axis=1).replace(0, np.nan) + weights = raw.div(row_sums, axis=0).fillna(0.0) + + warmup = 252 + rebal_mask = pd.Series(False, index=data.index) + rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True + weights[~rebal_mask] = np.nan + weights = weights.ffill().fillna(0.0) + weights.iloc[:warmup] = 0.0 + + if self.dispersion_scale: + # Cross-sectional rank dispersion = daily std of composite. Scale + # exposure up in high-dispersion regimes (alpha opportunity richer). + disp = composite.std(axis=1) + z = (disp - disp.rolling(252, min_periods=126).mean()) \ + / disp.rolling(252, min_periods=126).std() + scale = (1.0 + 0.3 * z.clip(-1, 1)).clip(0.5, 1.3) + scale = scale.reindex(weights.index).fillna(1.0) + weights = weights.mul(scale, axis=0) + + return weights.shift(1).fillna(0.0) + + +# --------------------------------------------------------------------------- +# Voting top-N +# --------------------------------------------------------------------------- + +class VotingFactorStrategy(Strategy): + """ + Top-N by vote count: each factor contributes 1 vote if a stock is in its + top `vote_pct` percentile. Select stocks with vote_count ≥ min_votes, + break ties by sum of ranks. + """ + + def __init__(self, factor_names: list[str], top_n: int = 10, + rebal_freq: int = 21, vote_pct: float = 0.25, + min_votes: int = 3, mkt_returns: pd.Series | None = None): + self.factor_names = factor_names + self.top_n = top_n + self.rebal_freq = rebal_freq + self.vote_pct = vote_pct + self.min_votes = min_votes + self.mkt_returns = mkt_returns + + def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: + builder = MultiplicativeFactorStrategy( + factor_names=self.factor_names, top_n=self.top_n, + rebal_freq=self.rebal_freq, mkt_returns=self.mkt_returns) + factors = builder._build(data) + ranks = {n: xsec_rank(v) for n, v in factors.items()} + thresh = 1 - self.vote_pct + votes = sum((rk >= thresh).astype(float) for rk in ranks.values()) + rank_sum = sum(rk.fillna(0) for rk in ranks.values()) + + # Primary sort: vote count; tiebreaker: rank_sum. Build a composite. + composite = votes + rank_sum / (len(ranks) * 10) + composite = composite.where(votes >= self.min_votes, np.nan) + + sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom") + n_valid = composite.notna().sum(axis=1) + enough = n_valid >= 1 + effective_n = n_valid.clip(upper=self.top_n) + top_mask = (sel_rank <= effective_n.values.reshape(-1, 1)) & enough.values.reshape(-1, 1) + + raw = top_mask.astype(float) + row_sums = raw.sum(axis=1).replace(0, np.nan) + weights = raw.div(row_sums, axis=0).fillna(0.0) + + warmup = 252 + rebal_mask = pd.Series(False, index=data.index) + rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True + weights[~rebal_mask] = np.nan + weights = weights.ffill().fillna(0.0) + weights.iloc[:warmup] = 0.0 + return weights.shift(1).fillna(0.0) + + +# --------------------------------------------------------------------------- +# Sub-strategy ensemble +# --------------------------------------------------------------------------- + +class SubStrategyEnsemble(Strategy): + """Equal-weight blend of several long-only sub-strategies.""" + + def __init__(self, sub_strats: list[Strategy]): + self.sub_strats = sub_strats + self.w = 1.0 / len(sub_strats) + + def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: + out = None + for strat in self.sub_strats: + sig = strat.generate_signals(data) * self.w + out = sig if out is None else out.add(sig, fill_value=0.0) + return out + + +def default_ensemble(mkt_returns: pd.Series) -> SubStrategyEnsemble: + return SubStrategyEnsemble([ + RecoveryMomentumStrategy(top_n=10), + FactorComboStrategy("up_cap+mom_gap", rebal_freq=21, top_n=10), + MultiplicativeFactorStrategy( + factor_names=["mom_12_1", "recovery_63", "idio_vol_neg"], + top_n=10, rebal_freq=21, mkt_returns=mkt_returns, + ), + ]) diff --git a/research/run_interaction.py b/research/run_interaction.py new file mode 100644 index 0000000..1658877 --- /dev/null +++ b/research/run_interaction.py @@ -0,0 +1,136 @@ +""" +Evaluate interaction/ensemble strategies on 1/3/5/10y PIT windows with a +proper 500-day warmup preload, so 252d-warmup strategies are active from the +measurement start. +""" + +from __future__ import annotations + +import os +import warnings + +import pandas as pd + +import research.pit_backtest as pit +from research.alpha_factors import AlphaFactorStrategy +from research.interaction_alpha import (MultiplicativeFactorStrategy, + SubStrategyEnsemble, VotingFactorStrategy, + default_ensemble) +from strategies.factor_combo import FactorComboStrategy +from strategies.recovery_momentum import RecoveryMomentumStrategy + +warnings.filterwarnings("ignore", category=FutureWarning) +warnings.filterwarnings("ignore", category=RuntimeWarning) + +DATA_DIR = "data" +BENCHMARK = "SPY" + + +def load(): + raw = pit.load_pit_prices() + masked = pit.pit_universe(raw) + masked[BENCHMARK] = raw[BENCHMARK] + return masked + + +def warmup_slice(df, years, warmup_days=500): + measurement_start = df.index[-1] - pd.DateOffset(years=years) + cutoff = max(df.index[0], measurement_start - pd.Timedelta(days=warmup_days * 1.5)) + return df[df.index >= cutoff], measurement_start + + +def measure(eq, start, name=""): + eq = eq[eq.index >= start] + eq = eq / eq.iloc[0] * 10_000 + return pit.summarize(eq, name=name) + + +def make_configs(mkt_ret): + pair = ["mom_12_1", "recovery_63"] + return { + "Mult(mom12×rec63) eq, tn=10": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=10, rebal_freq=21, mkt_returns=mkt_ret), + "Mult(mom12×rec63) eq, tn=15": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=15, rebal_freq=21, mkt_returns=mkt_ret), + "Mult(mom12×rec63) eq, rebal=10": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=10, rebal_freq=10, mkt_returns=mkt_ret), + "Mult(mom12×rec63) sig^2, tn=15": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=15, rebal_freq=21, mkt_returns=mkt_ret, + weighting="signal", signal_concentration=2.0), + "Mult(mom12×rec63) sig^4, tn=15": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=15, rebal_freq=21, mkt_returns=mkt_ret, + weighting="signal", signal_concentration=4.0), + "Mult(mom12×rec63) disp-scale": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=10, rebal_freq=21, mkt_returns=mkt_ret, + dispersion_scale=True), + "Mult(mom12×rec63) inv_vol": + lambda: MultiplicativeFactorStrategy( + factor_names=pair, top_n=10, rebal_freq=21, mkt_returns=mkt_ret, + weighting="inv_vol"), + "Ensemble3 (RM/upcap/mult)": + lambda: default_ensemble(mkt_ret), + "Recovery+Mom Top10": + lambda: RecoveryMomentumStrategy(top_n=10), + "fc_up_cap+mom_gap": + lambda: FactorComboStrategy("up_cap+mom_gap", rebal_freq=21, top_n=10), + } + + +def main(): + print("Loading PIT data…") + masked = load() + tickers = [c for c in masked.columns if c != BENCHMARK] + mkt_ret = masked[BENCHMARK].pct_change(fill_method=None) + print(f" shape={masked.shape} range={masked.index[0].date()} → {masked.index[-1].date()}") + + rows = [] + for years in (10, 5, 3, 1): + sliced, start = warmup_slice(masked, years, warmup_days=500) + prices = sliced[tickers] + print(f"\n--- {years}y window " + f"(measure {start.date()} → {sliced.index[-1].date()}, " + f"warmup from {sliced.index[0].date()}) ---") + spy = sliced[BENCHMARK].dropna() + spy_eq = (spy / spy.iloc[0]) * 10_000 + m = measure(spy_eq, start, "") + rows.append({"years": years, "strategy": "SPY buy-and-hold", + **{k: v for k, v in m.items() if k != "name"}}) + configs = make_configs(mkt_ret) + for name, factory in configs.items(): + strat = factory() + eq = pit.backtest(strategy=strat, prices=prices, + initial_capital=10_000, transaction_cost=0.001) + m = measure(eq, start, "") + rows.append({"years": years, "strategy": name, + **{k: v for k, v in m.items() if k != "name"}}) + tail = [r for r in rows if r["years"] == years] + tail.sort(key=lambda r: r["Sharpe"], reverse=True) + for r in tail: + print(f" {r['strategy']:<34s} CAGR={r['CAGR']*100:>6.1f}% " + f"Sharpe={r['Sharpe']:>5.2f} Sortino={r['Sortino']:>5.2f} " + f"MaxDD={r['MaxDD']*100:>6.1f}% Calmar={r['Calmar']:>5.2f}") + + df = pd.DataFrame(rows) + df.to_csv(os.path.join(DATA_DIR, "interaction_results.csv"), index=False) + + print("\n=== Cross-window CAGR summary (sorted by 10y Sharpe) ===") + pv = df.pivot(index="strategy", columns="years", values="CAGR") + pv.columns = [f"CAGR_{y}y" for y in pv.columns] + sh10 = df[df["years"] == 10].set_index("strategy")["Sharpe"] + pv["Sharpe_10y"] = sh10 + pv = pv.sort_values("Sharpe_10y", ascending=False) + print(pv.to_string(formatters={ + "CAGR_10y": "{:.1%}".format, "CAGR_5y": "{:.1%}".format, + "CAGR_3y": "{:.1%}".format, "CAGR_1y": "{:.1%}".format, + "Sharpe_10y": "{:.2f}".format, + })) + + +if __name__ == "__main__": + main() diff --git a/research/strategy_sweep.py b/research/strategy_sweep.py new file mode 100644 index 0000000..cb8b115 --- /dev/null +++ b/research/strategy_sweep.py @@ -0,0 +1,145 @@ +""" +Unified 3/5/10-year PIT backtest for every production strategy. + +Runs the full strategy roster against the point-in-time S&P 500 price matrix +from research/pit_backtest and reports CAGR / Sharpe / Sortino / MaxDD / Calmar +for three trailing windows. Results are written to data/sweep_y.csv and +printed to stdout. + +Usage: + uv run python -m research.strategy_sweep +""" + +import os + +import pandas as pd + +import research.pit_backtest as pit +from strategies.adaptive_momentum import AdaptiveMomentumStrategy +from strategies.dual_momentum import DualMomentumStrategy +from strategies.factor_combo import SIGNAL_REGISTRY, FactorComboStrategy +from strategies.inverse_vol import InverseVolatilityStrategy +from strategies.mean_reversion import MeanReversionStrategy +from strategies.momentum import MomentumStrategy +from strategies.momentum_quality import MomentumQualityStrategy +from strategies.multi_factor import MultiFactorStrategy +from strategies.recovery_momentum import RecoveryMomentumStrategy +from strategies.trend_following import TrendFollowingStrategy + +DATA_DIR = "data" +BENCHMARK = "SPY" + + +def build_strategies(tickers: list[str]) -> dict: + """Instantiate every production strategy; returns {name: strategy}.""" + top_n = max(5, len(tickers) // 10) + strategies: dict = { + # --- Baselines --- + "SPY buy-and-hold": None, # handled separately + "Momentum": MomentumStrategy(lookback=252, skip=21, top_n=top_n), + "Inverse Volatility": InverseVolatilityStrategy(vol_window=20), + "Multi-Factor": MultiFactorStrategy(tickers=tickers, benchmark=BENCHMARK, + top_n=top_n), + "Mean Reversion": MeanReversionStrategy(top_n=top_n), + "Trend Following": TrendFollowingStrategy(ma_window=150, momentum_period=126, + top_n=top_n), + "Dual Momentum": DualMomentumStrategy(top_n=top_n), + "Momentum+Quality": MomentumQualityStrategy(momentum_period=252, skip=21, + top_n=top_n), + "Mom+InvVol": AdaptiveMomentumStrategy(top_n=top_n), + "Recovery+Mom Top20": RecoveryMomentumStrategy(top_n=min(20, top_n)), + "Recovery+Mom Top10": RecoveryMomentumStrategy(top_n=10), + } + # Factor-combo (monthly rebalance; biweekly is the other interesting one, + # but monthly aligns with how the RecoveryMomentum defaults are set). + for name in SIGNAL_REGISTRY: + key = f"fc_{name.replace('+', '_').replace('×', 'x')}_monthly" + strategies[key] = FactorComboStrategy(name, rebal_freq=21, top_n=10) + return strategies + + +def slice_years(prices: pd.DataFrame, years: int) -> pd.DataFrame: + cutoff = prices.index[-1] - pd.DateOffset(years=years) + return prices[prices.index >= cutoff] + + +def run_one(name: str, strat, prices: pd.DataFrame, + tickers: list[str]) -> dict: + if strat is None: + # SPY buy-and-hold + spy = prices[BENCHMARK].dropna() + eq = (spy / spy.iloc[0]) * 10_000 + return {"strategy": name, **{k: v for k, v in pit.summarize(eq, name=name).items() + if k != "name"}} + # MultiFactor needs the benchmark column → pass full `prices`; others only tickers. + if isinstance(strat, MultiFactorStrategy): + strat_prices = prices # keep SPY column + else: + strat_prices = prices[tickers] + eq = pit.backtest(strategy=strat, prices=strat_prices, initial_capital=10_000, + transaction_cost=0.001) + return {"strategy": name, **{k: v for k, v in pit.summarize(eq, name=name).items() + if k != "name"}} + + +def fmt(row: dict) -> str: + return (f" {row['strategy']:<44s} " + f"CAGR={row['CAGR']*100:>6.1f}% " + f"Sharpe={row['Sharpe']:>5.2f} " + f"Sortino={row['Sortino']:>5.2f} " + f"MaxDD={row['MaxDD']*100:>6.1f}% " + f"Calmar={row['Calmar']:>5.2f}") + + +def main() -> None: + print("Loading point-in-time price data…") + raw = pit.load_pit_prices() + masked = pit.pit_universe(raw) + # Preserve SPY even though it's not in the membership intervals. + if BENCHMARK in raw.columns: + masked[BENCHMARK] = raw[BENCHMARK] + tickers = [c for c in masked.columns if c != BENCHMARK] + print(f" tickers={len(tickers)} rows={len(masked)} " + f"range={masked.index[0].date()}→{masked.index[-1].date()}") + + all_results: dict[int, pd.DataFrame] = {} + for years in (10, 5, 3): + sliced = slice_years(masked, years) + strategies = build_strategies(tickers) + print("\n" + "=" * 110) + print(f"Window = last {years} years ({sliced.index[0].date()} → {sliced.index[-1].date()})") + print("=" * 110) + rows = [] + for name, strat in strategies.items(): + try: + rows.append(run_one(name, strat, sliced, tickers)) + except Exception as exc: # noqa: BLE001 + print(f" [skip] {name}: {type(exc).__name__}: {exc}") + continue + df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False) + for _, r in df.iterrows(): + print(fmt(r)) + out = os.path.join(DATA_DIR, f"sweep_{years}y.csv") + df.to_csv(out, index=False) + all_results[years] = df + print(f" → saved {out}") + + # Cross-window comparison: only strategies present in all windows. + print("\n" + "=" * 110) + print("Cross-window CAGR comparison (sorted by 10y Sharpe)") + print("=" * 110) + pivot = pd.DataFrame({ + f"CAGR_{y}y": all_results[y].set_index("strategy")["CAGR"] + for y in (10, 5, 3) + }) + sharpe10 = all_results[10].set_index("strategy")["Sharpe"] + pivot["Sharpe_10y"] = sharpe10 + pivot = pivot.sort_values("Sharpe_10y", ascending=False) + print(pivot.to_string(formatters={ + "CAGR_10y": "{:.1%}".format, "CAGR_5y": "{:.1%}".format, + "CAGR_3y": "{:.1%}".format, "Sharpe_10y": "{:.2f}".format, + })) + + +if __name__ == "__main__": + main()