"""Three directions to break V7+VT36's ceiling (61% Ann, Sharpe 1.89). Direction A — Multi-strategy ensemble: V7 + stock pickers, capital split. Direction B — Cross-market V7: sector 3x ETFs (SOXL, TECL, TNA, FAS). Direction C — Improved regime engine: alternative signals replacing MA150. """ from __future__ import annotations import sys sys.path.insert(0, ".") import numpy as np import pandas as pd import data_manager import metrics import universe_history as uh from main import backtest from strategies.base import Strategy from strategies.permanent import TrendRiderV3 from strategies.trend_rider_v7 import TrendRiderV7 from strategies.recovery_momentum import RecoveryMomentumStrategy from universe import UNIVERSES YEARS = 10 CAPITAL = 100_000 TX_COST = 0.001 FIXED_FEE = 2.0 def run_and_report(label, strategy, data_panel, capital=CAPITAL): eq = backtest(strategy, data_panel, initial_capital=capital, transaction_cost=TX_COST, fixed_fee=FIXED_FEE) m = metrics.raw_summary(eq) return label, eq, m def print_table(results: list[tuple[str, pd.Series, dict]]): print(f"{'#':<4} {'Strategy':<52} {'Ann%':>7} {'Vol%':>7} {'Sharpe':>7} " f"{'Sortino':>8} {'MaxDD%':>7} {'Calmar':>7}") print("-" * 115) for i, (label, _, m) in enumerate(results, 1): marker = " ★" if i <= 3 else "" print(f"{i:<4} {label:<52} " f"{m['annualizedReturn']*100:>6.1f}% " f"{m['annualizedVolatility']*100:>6.1f}% " f"{m['sharpeRatio']:>7.2f} " f"{m['sortinoRatio']:>8.2f} " f"{m['maxDrawdown']*100:>6.1f}% " f"{m['calmarRatio']:>7.2f}{marker}") def ensemble_equity(equities: list[pd.Series], weights: list[float] | None = None ) -> pd.Series: """Combine independent equity curves with periodic rebalancing. Each equity is assumed to start at $CAPITAL. Returns combined equity as if capital were split according to weights. """ if weights is None: weights = [1.0 / len(equities)] * len(equities) idx = equities[0].index for eq in equities[1:]: idx = idx.intersection(eq.index) aligned = [eq.reindex(idx).ffill() for eq in equities] # Combine as weighted sum of normalized curves combined = pd.Series(0.0, index=idx) for eq, w in zip(aligned, weights): combined += (eq / eq.iloc[0]) * w combined = combined * CAPITAL return combined # ========================================================================= # Data loading (shared) # ========================================================================= def load_all_data(): print("=" * 100) print(" LOADING ALL DATA") print("=" * 100) # S&P 500 + PIT universe = UNIVERSES["us"] tickers = universe["fetch"]() pit_intervals = uh.load_sp500_history() hist_tickers = uh.all_tickers_ever(pit_intervals) # All ETFs needed across all three directions core_etfs = ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY", "TLT"] sector_etfs = [ "SOXL", "SMH", # 3x semi / semi index "TECL", "XLK", # 3x tech / tech sector "TNA", "IWM", # 3x Russell 2000 "FAS", "XLF", # 3x financials ] regime_etfs = ["VIX", "^VIX"] # VIX for alt regime signals all_etfs = sorted(set(core_etfs + sector_etfs + regime_etfs)) # Stock data (includes ETFs for mixed strategies) all_stock_tickers = sorted(set(tickers + hist_tickers + all_etfs)) print(f"\nDownloading {len(all_stock_tickers)} tickers...") stock_data = data_manager.update("us", all_stock_tickers, with_open=False) if isinstance(stock_data, tuple): stock_data = stock_data[0] cutoff = stock_data.index[-1] - pd.DateOffset(years=YEARS) stock_data = stock_data[stock_data.index >= cutoff] stock_data = uh.mask_prices(stock_data, pit_intervals) # Pure ETF data etf_data = data_manager.update("etfs", all_etfs, with_open=False) if isinstance(etf_data, tuple): etf_data = etf_data[0] etf_cutoff = etf_data.index[-1] - pd.DateOffset(years=YEARS) etf_data = etf_data[etf_data.index >= etf_cutoff] stock_tickers = [t for t in stock_data.columns if t not in all_etfs and stock_data[t].notna().any()] print(f"Stocks: {len(stock_tickers)}") print(f"Period: {stock_data.index[0].date()} → {stock_data.index[-1].date()}") print(f"ETF columns: {sorted(etf_data.columns.tolist())}") return stock_data, etf_data, stock_tickers, all_etfs # ========================================================================= # DIRECTION A: Multi-strategy ensemble # ========================================================================= def direction_a(stock_data, etf_data, stock_tickers, all_etfs): print("\n" + "=" * 100) print(" DIRECTION A: MULTI-STRATEGY ENSEMBLE") print("=" * 100) results = [] # Baselines etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns] label, eq_v7, m = run_and_report( "V7+VT36 (baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[etf_cols]) results.append((label, eq_v7, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}") label, eq_rec, m = run_and_report( "RecoveryMom Top10 (baseline)", RecoveryMomentumStrategy(top_n=10), stock_data[stock_tickers]) results.append((label, eq_rec, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}") # Ensembles with different splits for v7_pct in (0.5, 0.6, 0.7, 0.8): stock_pct = 1.0 - v7_pct label = f"Ensemble {int(v7_pct*100)}% V7 + {int(stock_pct*100)}% RecMom" eq = ensemble_equity([eq_v7, eq_rec], [v7_pct, stock_pct]) m = metrics.raw_summary(eq) results.append((label, eq, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") # Also try V7+VT36 + V7+VT24 (low-vol variant) ensemble label, eq_v7_lo, m = run_and_report( "V7+VT24 (low-vol)", TrendRiderV7(target_vol=0.24, min_lev=0.50), etf_data[etf_cols]) results.append((label, eq_v7_lo, m)) eq_v7_duo = ensemble_equity([eq_v7, eq_v7_lo], [0.6, 0.4]) m = metrics.raw_summary(eq_v7_duo) results.append(("Ensemble 60% V7-VT36 + 40% V7-VT24", eq_v7_duo, m)) print(f" V7-VT36/VT24 blend: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True) print(f"\n--- Direction A Results (sorted by Sharpe) ---") print_table(results) return results # ========================================================================= # DIRECTION B: Cross-market V7 (sector 3x ETFs) # ========================================================================= def direction_b(stock_data, etf_data, stock_tickers, all_etfs): print("\n" + "=" * 100) print(" DIRECTION B: CROSS-MARKET V7 (SECTOR 3x ETFs)") print("=" * 100) results = [] # Baseline etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns] label, eq_v7, m = run_and_report( "V7+VT36 SPY→TQQQ/UPRO (baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[etf_cols]) results.append((label, eq_v7, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%") # Sector V7 instances sector_configs = [ ("SMH→SOXL (Semiconductor)", "SMH", ("SOXL",)), ("XLK→TECL (Technology)", "XLK", ("TECL",)), ("IWM→TNA (Russell 2000)", "IWM", ("TNA",)), ("XLF→FAS (Financials)", "XLF", ("FAS",)), ] sector_equities = {} for desc, signal, risk_on in sector_configs: needed = [signal] + list(risk_on) + ["GLD", "DBC", "SHY"] available = [t for t in needed if t in etf_data.columns] if signal not in available or not any(r in available for r in risk_on): print(f" SKIP {desc}: missing data ({signal} or {risk_on})") continue risk_on_avail = tuple(r for r in risk_on if r in available) strategy = TrendRiderV7( signal=signal, risk_on=risk_on_avail, risk_off=("GLD", "DBC"), target_vol=0.36, min_lev=0.75, ) label = f"V7+VT36 {desc}" try: _, eq, m = run_and_report(label, strategy, etf_data[available]) results.append((label, eq, m)) sector_equities[desc] = eq print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") except Exception as e: print(f" FAILED {label}: {e}") # Cross-market ensembles if sector_equities: # All sectors + SPY equal weight all_eqs = [eq_v7] + list(sector_equities.values()) eq_all = ensemble_equity(all_eqs) m = metrics.raw_summary(eq_all) label = f"Equal-weight all {len(all_eqs)} V7 instances" results.append((label, eq_all, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") # Best 2-3 combinations if "SMH→SOXL (Semiconductor)" in sector_equities: eq_spy_semi = ensemble_equity([eq_v7, sector_equities["SMH→SOXL (Semiconductor)"]], [0.5, 0.5]) m = metrics.raw_summary(eq_spy_semi) results.append(("50% SPY-V7 + 50% SOXL-V7", eq_spy_semi, m)) print(f" SPY+SOXL combo: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") eq_spy_semi_70 = ensemble_equity([eq_v7, sector_equities["SMH→SOXL (Semiconductor)"]], [0.7, 0.3]) m = metrics.raw_summary(eq_spy_semi_70) results.append(("70% SPY-V7 + 30% SOXL-V7", eq_spy_semi_70, m)) if "XLK→TECL (Technology)" in sector_equities: eq_spy_tech = ensemble_equity([eq_v7, sector_equities["XLK→TECL (Technology)"]], [0.5, 0.5]) m = metrics.raw_summary(eq_spy_tech) results.append(("50% SPY-V7 + 50% TECL-V7", eq_spy_tech, m)) if len(sector_equities) >= 2: # SPY + top 2 sectors sorted_sectors = sorted(sector_equities.items(), key=lambda x: metrics.raw_summary(x[1])["sharpeRatio"], reverse=True) top2 = sorted_sectors[:2] eq_best3 = ensemble_equity([eq_v7] + [eq for _, eq in top2], [0.5] + [0.25] * 2) m = metrics.raw_summary(eq_best3) label = f"50% SPY-V7 + 25% {top2[0][0][:4]}.. + 25% {top2[1][0][:4]}.." results.append((label, eq_best3, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True) print(f"\n--- Direction B Results (sorted by Sharpe) ---") print_table(results) return results # ========================================================================= # DIRECTION C: Improved regime engine # ========================================================================= class V7AltRegime(Strategy): """V7 with pluggable regime function replacing V3._desired_regime.""" def __init__( self, regime_func, signal: str = "SPY", risk_on: tuple[str, ...] = ("TQQQ", "UPRO"), risk_off: tuple[str, ...] = ("GLD", "DBC"), target_vol: float = 0.36, vol_window: int = 60, min_lev: float = 0.75, max_lev: float = 1.0, pt_threshold: float = 0.30, pt_band: float = 0.10, pt_park: str = "SHY", ma_long: int = 150, mom_lookback: int = 63, min_hold: int = 15, ): self.regime_func = regime_func self.signal = signal self.risk_on = risk_on self.risk_off = risk_off self.target_vol = target_vol self.vol_window = vol_window self.min_lev = min_lev self.max_lev = max_lev self.pt_threshold = pt_threshold self.pt_band = pt_band self.pt_park = pt_park self.mom_lookback = mom_lookback self.min_hold = min_hold def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: cols = list({self.signal, *self.risk_on, *self.risk_off, self.pt_park}) cols = [c for c in cols if c in data.columns] w = pd.DataFrame(0.0, index=data.index, columns=cols) if self.signal not in data.columns: return w sig_arr = data[self.signal].to_numpy() sym_arrays = {s: data[s].to_numpy() for s in cols if s in data.columns} ron_syms = [s for s in self.risk_on if s in data.columns] roff_syms = [s for s in self.risk_off if s in data.columns] need = 252 regime: str | None = None bars = 0 def pick_best(basket, t): best_s, best_r = None, -np.inf for s in basket: arr = sym_arrays.get(s) if arr is None or t < self.mom_lookback + 1: continue if arr[t-1] <= 0 or np.isnan(arr[t-1]) or arr[t - self.mom_lookback - 1] <= 0: continue r = arr[t-1] / arr[t - self.mom_lookback - 1] - 1.0 if np.isfinite(r) and r > best_r: best_r, best_s = r, s return best_s for t in range(len(data)): if t < need: continue closes = sig_arr[:t] if np.isnan(closes[-1]): continue desired = self.regime_func(closes, regime) changed = False if regime is None: regime, bars, changed = desired, 0, True else: bars += 1 if desired != regime and bars >= self.min_hold: regime, bars, changed = desired, 0, True if not changed and (t - need) % 21 != 0: continue basket = ron_syms if regime == "risk_on" else roff_syms pick = pick_best(basket, t) if pick: w.iat[t, cols.index(pick)] = 1.0 w = w.replace(0.0, np.nan).ffill().fillna(0.0) w = w.shift(1).fillna(0.0) # Vol-target overlay daily_ret = data[cols].pct_change(fill_method=None).fillna(0.0) port_rets = (w * daily_ret).sum(axis=1) realized_vol = port_rets.rolling(self.vol_window, min_periods=21).std() * np.sqrt(252) scale = (self.target_vol / realized_vol).clip(lower=self.min_lev, upper=self.max_lev) scale = scale.shift(1).fillna(1.0) w = w.mul(scale, axis=0) # Profit-take if self.pt_threshold <= 0: return w held = w.idxmax(axis=1) max_w = w.max(axis=1) held[max_w < 1e-8] = "" park_col = self.pt_park if self.pt_park in w.columns else "" entry_price = None current_sym = None is_stopped = False restore_level = self.pt_threshold - self.pt_band for i in range(len(w)): sym = held.iloc[i] if not sym or max_w.iloc[i] < 1e-8: current_sym, entry_price, is_stopped = None, None, False continue if sym != current_sym: current_sym = sym entry_price = float(data[sym].iloc[i-1]) if i > 0 and sym in data.columns else None is_stopped = False continue if entry_price is None or entry_price <= 0 or sym not in data.columns: continue yesterday = float(data[sym].iloc[i-1]) if i > 0 else float(data[sym].iloc[i]) gain = yesterday / entry_price - 1.0 if is_stopped: if gain < restore_level: is_stopped = False else: w.iloc[i] = 0.0 if park_col: w.at[w.index[i], park_col] = scale.iloc[i] else: if gain >= self.pt_threshold: is_stopped = True w.iloc[i] = 0.0 if park_col: w.at[w.index[i], park_col] = scale.iloc[i] return w # Regime functions def regime_ma(window: int): """Simple MA crossover: above MA → risk_on.""" def fn(closes, current): if len(closes) < window: return "risk_off" return "risk_on" if closes[-1] > np.mean(closes[-window:]) else "risk_off" return fn def regime_dual_ma(short: int = 50, long: int = 200): """Golden/death cross: MA_short > MA_long → risk_on.""" def fn(closes, current): if len(closes) < long: return "risk_off" ma_s = np.mean(closes[-short:]) ma_l = np.mean(closes[-long:]) return "risk_on" if ma_s > ma_l else "risk_off" return fn def regime_roc(window: int = 63): """Rate of change: positive N-day return → risk_on.""" def fn(closes, current): if len(closes) < window + 1 or closes[-window-1] <= 0: return "risk_off" roc = closes[-1] / closes[-window-1] - 1.0 return "risk_on" if roc > 0 else "risk_off" return fn def regime_ma_plus_vol(ma_window: int = 150, vol_window: int = 20, vol_cap: float = 0.20): """MA + vol filter: above MA AND vol < cap → risk_on.""" def fn(closes, current): if len(closes) < max(ma_window, vol_window + 1): return "risk_off" above_ma = closes[-1] > np.mean(closes[-ma_window:]) if not above_ma: return "risk_off" rets = np.diff(closes[-vol_window-1:]) / np.maximum(closes[-vol_window-1:-1], 1e-12) vol = float(np.std(rets, ddof=1) * np.sqrt(252)) return "risk_on" if vol < vol_cap else "risk_off" return fn def regime_ma_slope(ma_window: int = 150, slope_window: int = 10): """MA + positive slope: above MA AND MA is rising → risk_on.""" def fn(closes, current): if len(closes) < ma_window + slope_window: return "risk_off" ma_now = np.mean(closes[-ma_window:]) ma_prev = np.mean(closes[-ma_window - slope_window:-slope_window]) above = closes[-1] > ma_now rising = ma_now > ma_prev return "risk_on" if (above and rising) else "risk_off" return fn def regime_composite(ma_w: int = 150, roc_w: int = 63, vol_w: int = 20, vol_cap: float = 0.22, threshold: int = 2): """Composite: score from MA + ROC + vol. Need ≥ threshold signals for risk_on.""" def fn(closes, current): if len(closes) < max(ma_w, roc_w + 1, vol_w + 1): return "risk_off" score = 0 # Signal 1: above MA if closes[-1] > np.mean(closes[-ma_w:]): score += 1 # Signal 2: positive ROC if closes[-roc_w-1] > 0 and closes[-1] / closes[-roc_w-1] - 1.0 > 0: score += 1 # Signal 3: vol below cap rets = np.diff(closes[-vol_w-1:]) / np.maximum(closes[-vol_w-1:-1], 1e-12) vol = float(np.std(rets, ddof=1) * np.sqrt(252)) if vol < vol_cap: score += 1 return "risk_on" if score >= threshold else "risk_off" return fn def regime_adaptive_ma(fast: int = 100, slow: int = 200, vol_w: int = 60, vol_threshold: float = 0.18): """Adaptive MA: use fast MA in low vol, slow MA in high vol. High vol → slower signal → fewer whipsaws.""" def fn(closes, current): if len(closes) < slow: return "risk_off" rets = np.diff(closes[-vol_w-1:]) / np.maximum(closes[-vol_w-1:-1], 1e-12) vol = float(np.std(rets, ddof=1) * np.sqrt(252)) ma_w = slow if vol > vol_threshold else fast return "risk_on" if closes[-1] > np.mean(closes[-ma_w:]) else "risk_off" return fn def direction_c(stock_data, etf_data, stock_tickers, all_etfs): print("\n" + "=" * 100) print(" DIRECTION C: IMPROVED REGIME ENGINE") print("=" * 100) etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns] results = [] # V7+VT36 baseline (uses V3's full regime with MA+vol+dd+peak gates) label, eq, m = run_and_report( "V7+VT36 (V3 full regime, baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[etf_cols]) results.append((label, eq, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}") regime_configs = [ # Simple MA variants ("Simple MA100", regime_ma(100)), ("Simple MA150", regime_ma(150)), ("Simple MA200", regime_ma(200)), # Dual MA ("Dual MA 50/200", regime_dual_ma(50, 200)), ("Dual MA 50/150", regime_dual_ma(50, 150)), ("Dual MA 20/100", regime_dual_ma(20, 100)), # ROC ("ROC 63d", regime_roc(63)), ("ROC 126d", regime_roc(126)), # MA + vol filter ("MA150 + Vol<20%", regime_ma_plus_vol(150, 20, 0.20)), ("MA150 + Vol<25%", regime_ma_plus_vol(150, 20, 0.25)), ("MA200 + Vol<20%", regime_ma_plus_vol(200, 20, 0.20)), # MA + slope ("MA150 + Rising (10d)", regime_ma_slope(150, 10)), ("MA150 + Rising (20d)", regime_ma_slope(150, 20)), # Composite ("Composite 2/3 (MA150+ROC63+Vol)", regime_composite(150, 63, 20, 0.22, 2)), ("Composite 3/3 (all must agree)", regime_composite(150, 63, 20, 0.22, 3)), # Adaptive MA ("Adaptive MA100/200 (vol pivot 18%)", regime_adaptive_ma(100, 200, 60, 0.18)), ("Adaptive MA100/200 (vol pivot 22%)", regime_adaptive_ma(100, 200, 60, 0.22)), ] for label, regime_fn in regime_configs: try: strategy = V7AltRegime(regime_func=regime_fn) _, eq, m = run_and_report(f"AltRegime: {label}", strategy, etf_data[etf_cols]) results.append((f"AltRegime: {label}", eq, m)) print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%") except Exception as e: print(f" FAILED {label}: {e}") results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True) print(f"\n--- Direction C Results (sorted by Sharpe) ---") print_table(results) return results # ========================================================================= # MAIN # ========================================================================= def main(): stock_data, etf_data, stock_tickers, all_etfs = load_all_data() results_a = direction_a(stock_data, etf_data, stock_tickers, all_etfs) results_b = direction_b(stock_data, etf_data, stock_tickers, all_etfs) results_c = direction_c(stock_data, etf_data, stock_tickers, all_etfs) # Final summary print("\n" + "=" * 100) print(" CROSS-DIRECTION SUMMARY") print("=" * 100) all_results = ( [(f"[A] {l}", eq, m) for l, eq, m in results_a] + [(f"[B] {l}", eq, m) for l, eq, m in results_b] + [(f"[C] {l}", eq, m) for l, eq, m in results_c] ) all_results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True) print(f"\nTop 10 by Sharpe across all directions:") print(f"{'#':<4} {'Strategy':<60} {'Ann%':>7} {'Sharpe':>7} {'MaxDD%':>7} {'Calmar':>7}") print("-" * 100) for i, (label, _, m) in enumerate(all_results[:10], 1): print(f"{i:<4} {label:<60} " f"{m['annualizedReturn']*100:>6.1f}% " f"{m['sharpeRatio']:>7.2f} " f"{m['maxDrawdown']*100:>6.1f}% " f"{m['calmarRatio']:>7.2f}") print(f"\nTop 10 by Ann. Return across all directions:") all_results.sort(key=lambda x: x[2]["annualizedReturn"], reverse=True) print(f"{'#':<4} {'Strategy':<60} {'Ann%':>7} {'Sharpe':>7} {'MaxDD%':>7} {'Calmar':>7}") print("-" * 100) for i, (label, _, m) in enumerate(all_results[:10], 1): print(f"{i:<4} {label:<60} " f"{m['annualizedReturn']*100:>6.1f}% " f"{m['sharpeRatio']:>7.2f} " f"{m['maxDrawdown']*100:>6.1f}% " f"{m['calmarRatio']:>7.2f}") if __name__ == "__main__": main()