"""V7 parameter sweep: vol-target range + adaptive profit-take variants. Direction 1: higher vol-target (VT24 → VT48) Direction 3: adaptive profit-take (vol-scaled, time-decay, combined) """ from __future__ import annotations import sys sys.path.insert(0, ".") import numpy as np import pandas as pd import data_manager import metrics from main import backtest from strategies.base import Strategy from strategies.permanent import TrendRiderV3 YEARS = 10 CAPITAL = 100_000 TX_COST = 0.001 FIXED_FEE = 2.0 ETF_TICKERS = ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] # --------------------------------------------------------------------------- # Adaptive V7: modular profit-take that accepts a callable threshold # --------------------------------------------------------------------------- class TrendRiderV7Adaptive(Strategy): """V7 with pluggable profit-take logic. pt_func(gain, realized_vol, days_held) -> (threshold, restore_level) If pt_func is None, no profit-take is applied. """ def __init__( self, ma_long: int = 150, signal: str = "SPY", risk_on: tuple[str, ...] = ("TQQQ", "UPRO"), risk_off: tuple[str, ...] = ("GLD", "DBC"), target_vol: float = 0.28, vol_window: int = 60, min_lev: float = 0.6, max_lev: float = 1.0, pt_func=None, pt_park: str = "SHY", **v3_kwargs, ) -> None: self.target_vol = target_vol self.vol_window = vol_window self.min_lev = min_lev self.max_lev = max_lev self.pt_func = pt_func self.pt_park = pt_park self.v3 = TrendRiderV3( signal=signal, risk_on=risk_on, risk_off=risk_off, ma_long=ma_long, **v3_kwargs, ) def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: w = self.v3.generate_signals(data) # Vol-target overlay daily_ret = data.pct_change(fill_method=None).fillna(0.0) port_rets = (w * daily_ret).sum(axis=1) realized_vol = ( port_rets.rolling(self.vol_window, min_periods=21).std() * np.sqrt(252) ) scale = (self.target_vol / realized_vol).clip( lower=self.min_lev, upper=self.max_lev, ) scale = scale.shift(1).fillna(1.0) w = w.mul(scale, axis=0) if self.pt_func is None: return w # Adaptive profit-take held = w.idxmax(axis=1) max_w = w.max(axis=1) held[max_w < 1e-8] = "" park_col = self.pt_park if self.pt_park in w.columns else "" entry_price: float | None = None current_sym: str | None = None is_stopped = False days_held = 0 for i in range(len(w)): sym = held.iloc[i] if not sym or max_w.iloc[i] < 1e-8: current_sym = None entry_price = None is_stopped = False days_held = 0 continue if sym != current_sym: current_sym = sym entry_price = ( float(data[sym].iloc[i - 1]) if i > 0 and sym in data.columns else None ) is_stopped = False days_held = 0 continue days_held += 1 if entry_price is None or entry_price <= 0 or sym not in data.columns: continue yesterday = float(data[sym].iloc[i - 1]) if i > 0 else float(data[sym].iloc[i]) gain = yesterday / entry_price - 1.0 rv = float(realized_vol.iloc[i]) if not np.isnan(realized_vol.iloc[i]) else 0.25 threshold, restore_level = self.pt_func(gain, rv, days_held) if is_stopped: if gain < restore_level: is_stopped = False else: w.iloc[i] = 0.0 if park_col: w.at[w.index[i], park_col] = scale.iloc[i] else: if gain >= threshold: is_stopped = True w.iloc[i] = 0.0 if park_col: w.at[w.index[i], park_col] = scale.iloc[i] return w # --------------------------------------------------------------------------- # Profit-take function factories # --------------------------------------------------------------------------- def fixed_pt(threshold: float, band: float): """Classic fixed threshold (V7 default).""" def fn(gain, rv, days_held): return threshold, threshold - band return fn def vol_adaptive_pt(base_threshold: float = 0.30, base_vol: float = 0.25, band_ratio: float = 0.33, lo: float = 0.15, hi: float = 0.50): """Threshold scales inversely with realized vol. High vol → lower threshold (harvest earlier, vol drag is worse). Low vol → higher threshold (let profits run, drag is mild). """ def fn(gain, rv, days_held): rv = max(rv, 0.05) t = np.clip(base_threshold * (base_vol / rv), lo, hi) return t, t * (1 - band_ratio) return fn def time_decay_pt(start_threshold: float = 0.40, end_threshold: float = 0.18, decay_days: int = 120, band_ratio: float = 0.33): """Threshold decays linearly over holding period. Rationale: longer holds accumulate more vol drag → take profits earlier. """ def fn(gain, rv, days_held): frac = min(days_held / decay_days, 1.0) t = start_threshold - frac * (start_threshold - end_threshold) return t, t * (1 - band_ratio) return fn def combined_pt(base_threshold: float = 0.30, base_vol: float = 0.25, time_decay_rate: float = 0.0005, min_threshold: float = 0.12, max_threshold: float = 0.50, band_ratio: float = 0.33): """Vol-adaptive + time decay combined.""" def fn(gain, rv, days_held): rv = max(rv, 0.05) vol_adj = base_threshold * (base_vol / rv) time_adj = vol_adj - days_held * time_decay_rate t = np.clip(time_adj, min_threshold, max_threshold) return t, t * (1 - band_ratio) return fn def trailing_stop_pt(initial_threshold: float = 0.30, trail_pct: float = 0.15, band_ratio: float = 0.33): """Once gain exceeds threshold, switch to trailing stop from peak gain. Lets winners run further but protects from reversal. """ # We need state across calls, so use a mutable closure state = {"peak_gain": 0.0, "trailing_active": False} def fn(gain, rv, days_held): if days_held == 1: state["peak_gain"] = 0.0 state["trailing_active"] = False if state["trailing_active"]: state["peak_gain"] = max(state["peak_gain"], gain) trail_level = state["peak_gain"] * (1 - trail_pct) if gain < trail_level: return -1.0, -1.0 # trigger immediately return float("inf"), float("inf") # don't trigger via threshold else: if gain >= initial_threshold: state["trailing_active"] = True state["peak_gain"] = gain return float("inf"), float("inf") return initial_threshold, initial_threshold * (1 - band_ratio) return fn # --------------------------------------------------------------------------- # Main sweep # --------------------------------------------------------------------------- def main(): print("=" * 100) print(" V7 PARAMETER SWEEP: Vol-Target + Adaptive Profit-Take") print("=" * 100) # Load ETF data print("\n[1] Loading ETF data...") etf_data = data_manager.update("etfs", ETF_TICKERS, with_open=False) if isinstance(etf_data, tuple): etf_data = etf_data[0] cutoff = etf_data.index[-1] - pd.DateOffset(years=YEARS) etf_data = etf_data[etf_data.index >= cutoff] tradable = [t for t in ETF_TICKERS if t in etf_data.columns] print(f" Period: {etf_data.index[0].date()} → {etf_data.index[-1].date()}") results: list[tuple[str, str, dict]] = [] def run(group: str, label: str, strategy: Strategy): eq = backtest(strategy, etf_data[tradable], initial_capital=CAPITAL, transaction_cost=TX_COST, fixed_fee=FIXED_FEE) m = metrics.raw_summary(eq) results.append((group, label, m)) print(f" {label:<45} Ann={m['annualizedReturn']*100:.1f}% " f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}%") # ===================================================================== # SWEEP 1: Vol-target range (with fixed PT30) # ===================================================================== print("\n[2] Sweep 1: Vol-target range (PT30 fixed)") print("-" * 70) vt_configs = [ ("VT20", 0.20, 0.45), ("VT24", 0.24, 0.50), ("VT28 (default)", 0.28, 0.60), ("VT32", 0.32, 0.70), ("VT36", 0.36, 0.75), ("VT40", 0.40, 0.80), ("VT44", 0.44, 0.85), ("VT48", 0.48, 0.90), ("No VT (raw V3+PT30)", 0.28, 1.0), # min_lev=max_lev=1.0 → no scaling ] for label, tv, ml in vt_configs: if label.startswith("No VT"): s = TrendRiderV7Adaptive(target_vol=1.0, min_lev=1.0, max_lev=1.0, pt_func=fixed_pt(0.30, 0.10)) else: s = TrendRiderV7Adaptive(target_vol=tv, min_lev=ml, pt_func=fixed_pt(0.30, 0.10)) run("VT sweep", label, s) # ===================================================================== # SWEEP 2: Profit-take variants (using best VT from sweep 1) # ===================================================================== print("\n[3] Sweep 2: Profit-take variants (VT32)") print("-" * 70) best_vt = 0.32 best_ml = 0.70 pt_configs: list[tuple[str, object]] = [ # Fixed thresholds ("No PT (ablation)", None), ("Fixed PT15 band=5", fixed_pt(0.15, 0.05)), ("Fixed PT20 band=8", fixed_pt(0.20, 0.08)), ("Fixed PT25 band=10", fixed_pt(0.25, 0.10)), ("Fixed PT30 band=10 (default)", fixed_pt(0.30, 0.10)), ("Fixed PT35 band=12", fixed_pt(0.35, 0.12)), ("Fixed PT40 band=15", fixed_pt(0.40, 0.15)), ("Fixed PT50 band=15", fixed_pt(0.50, 0.15)), # Vol-adaptive ("Vol-adaptive (base=30%, lo=15%)", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)), ("Vol-adaptive (base=25%, lo=12%)", vol_adaptive_pt(0.25, 0.25, 0.33, 0.12, 0.45)), ("Vol-adaptive (base=35%, lo=18%)", vol_adaptive_pt(0.35, 0.25, 0.33, 0.18, 0.55)), # Time-decay ("Time-decay (40%→18%, 120d)", time_decay_pt(0.40, 0.18, 120)), ("Time-decay (35%→15%, 90d)", time_decay_pt(0.35, 0.15, 90)), ("Time-decay (45%→20%, 150d)", time_decay_pt(0.45, 0.20, 150)), # Combined ("Combined vol+time (base=30%)", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)), ("Combined vol+time (base=25%)", combined_pt(0.25, 0.25, 0.0005, 0.10, 0.45)), ] for label, pt_fn in pt_configs: s = TrendRiderV7Adaptive(target_vol=best_vt, min_lev=best_ml, pt_func=pt_fn) run("PT sweep", label, s) # ===================================================================== # SWEEP 3: Best PT × VT grid (narrow search around top combos) # ===================================================================== print("\n[4] Sweep 3: Best combos (VT × PT grid)") print("-" * 70) grid = [ (0.32, 0.70, "Fixed PT30", fixed_pt(0.30, 0.10)), (0.36, 0.75, "Fixed PT30", fixed_pt(0.30, 0.10)), (0.40, 0.80, "Fixed PT30", fixed_pt(0.30, 0.10)), (0.32, 0.70, "Fixed PT25", fixed_pt(0.25, 0.10)), (0.36, 0.75, "Fixed PT25", fixed_pt(0.25, 0.10)), (0.40, 0.80, "Fixed PT25", fixed_pt(0.25, 0.10)), (0.32, 0.70, "Vol-adapt 30%", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)), (0.36, 0.75, "Vol-adapt 30%", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)), (0.40, 0.80, "Vol-adapt 30%", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)), (0.32, 0.70, "Time-decay 40→18", time_decay_pt(0.40, 0.18, 120)), (0.36, 0.75, "Time-decay 40→18", time_decay_pt(0.40, 0.18, 120)), (0.40, 0.80, "Time-decay 40→18", time_decay_pt(0.40, 0.18, 120)), (0.32, 0.70, "Combined 30%", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)), (0.36, 0.75, "Combined 30%", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)), (0.40, 0.80, "Combined 30%", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)), ] for tv, ml, pt_label, pt_fn in grid: label = f"VT{int(tv*100)} + {pt_label}" s = TrendRiderV7Adaptive(target_vol=tv, min_lev=ml, pt_func=pt_fn) run("Grid", label, s) # ===================================================================== # Final ranking # ===================================================================== results.sort(key=lambda x: x[2]["annualizedReturn"], reverse=True) print(f"\n{'=' * 115}") print(" FINAL RANKING (sorted by annualized return)") print(f"{'=' * 115}") print(f"{'#':<4} {'Group':<12} {'Config':<45} {'Ann%':>7} {'Vol%':>7} {'Sharpe':>7} " f"{'Sortino':>8} {'MaxDD%':>7} {'Calmar':>7}") print("-" * 115) for i, (group, label, m) in enumerate(results, 1): ann = m["annualizedReturn"] * 100 vol = m["annualizedVolatility"] * 100 sr = m["sharpeRatio"] so = m["sortinoRatio"] dd = m["maxDrawdown"] * 100 ca = m["calmarRatio"] marker = " ★" if i <= 3 else "" print(f"{i:<4} {group:<12} {label:<45} {ann:>6.1f}% {vol:>6.1f}% {sr:>7.2f} " f"{so:>8.2f} {dd:>6.1f}% {ca:>7.2f}{marker}") print(f"{'=' * 115}") # Highlight top by Sharpe by_sharpe = sorted(results, key=lambda x: x[2]["sharpeRatio"], reverse=True) print("\nTop 5 by Sharpe:") for i, (group, label, m) in enumerate(by_sharpe[:5], 1): print(f" {i}. {label:<45} Sharpe={m['sharpeRatio']:.3f} " f"Ann={m['annualizedReturn']*100:.1f}% MaxDD={m['maxDrawdown']*100:.1f}%") by_calmar = sorted(results, key=lambda x: x[2]["calmarRatio"], reverse=True) print("\nTop 5 by Calmar:") for i, (group, label, m) in enumerate(by_calmar[:5], 1): print(f" {i}. {label:<45} Calmar={m['calmarRatio']:.3f} " f"Ann={m['annualizedReturn']*100:.1f}% MaxDD={m['maxDrawdown']*100:.1f}%") if __name__ == "__main__": main()