"""TrendRiderV5 — V3 with conviction-gated leverage tier modulation. Design rationale ---------------- V3 picks one of {TQQQ, UPRO, GLD, DBC} and rides it 100%. Its 75 regime switches over 11 years are the *correct* edge — we don't disturb them. V5 layers a small post-processor: at each rebalance event V3 produces, V5 inspects the prevailing conviction and decides what fraction of the equity sleeve is held in the 3× ETF vs its 1× counterpart. The state is a discrete *leverage tier* in {0%, 50%, 100%} of leveraged exposure, with hysteresis and minimum holding to keep turnover low. Specifically pair: SPY ↔ UPRO, QQQ ↔ TQQQ tier 0 (core_only) : 100% core (1× equity) tier 1 (half) : 50% core + 50% leveraged (≈ 2× equity) tier 2 (full) : 100% leveraged (3× equity) Conviction is built from directional/regime-quality signals (trend strength, drawdown depth, peak distance, downside-vol percentile). It is NOT a function of two-sided realized vol — that throttled V5 in good periods. Tier transitions require: promote (k → k+1) : conviction ≥ promote_threshold[k+1] for confirm_days demote (k → k-1) : conviction ≤ demote_threshold[k] for demote_confirm with `tier_min_hold` bars between any tier change. Risk-off behavior is unchanged from V3 (single-pick momentum leader of the risk_off basket), preserving V3's defensive characteristics. """ from __future__ import annotations import numpy as np import pandas as pd from strategies.permanent import TrendRiderV3 class TrendRiderV5(TrendRiderV3): """V3 + leverage-tier modulator on the equity sleeve. Default tier thresholds aim for: full 3× only when (a) below-MA200 risk is small, (b) we're near the 20-day high, and (c) drawdowns from the recent peak are inside ~1 vol-unit. Otherwise step down to 1× or 1.5×. """ DEFAULT_LEVERAGED_PAIR = {"SPY": "UPRO", "QQQ": "TQQQ"} DEFAULT_CORE_PAIR = {"UPRO": "SPY", "TQQQ": "QQQ"} def __init__( self, *args, # Conviction inputs peak_window: int = 20, dd_window: int = 40, trend_lookback: int = 63, downvol_window: int = 60, downvol_lookback: int = 252, # Tier thresholds [tier 1, tier 2] for promote / demote (hysteresis) promote_thresholds: tuple[float, float] = (0.40, 0.65), demote_thresholds: tuple[float, float] = (0.30, 0.50), promote_confirm: int = 5, demote_confirm: int = 3, tier_min_hold: int = 10, starting_tier: int = 2, # if regime is risk_on at first placement, start at 2 (full lev) # Panic demote — bypasses min-hold when fast vol regime detected. # Defaults below were chosen by walk-forward Calmar maximization on # IS (2015-2020, which does NOT contain the 2024-08 crash) — not # curve-fit to that specific event. panic_vol_short: int = 7, panic_vol_long: int = 60, panic_vol_ratio: float = 1.6, panic_peak_drop_pct: float = 0.06, panic_peak_window: int = 5, # Conviction component weights w_trend: float = 0.30, w_dd: float = 0.30, w_peak: float = 0.25, w_downvol: float = 0.15, # Pair mapping leveraged_pair: dict[str, str] | None = None, core_pair: dict[str, str] | None = None, **kwargs, ) -> None: super().__init__(*args, **kwargs) self.peak_window = peak_window self.dd_window = dd_window self.trend_lookback = trend_lookback self.downvol_window = downvol_window self.downvol_lookback = downvol_lookback self.promote_thresholds = promote_thresholds self.demote_thresholds = demote_thresholds self.promote_confirm = promote_confirm self.demote_confirm = demote_confirm self.tier_min_hold = tier_min_hold self.starting_tier = starting_tier self.panic_vol_short = panic_vol_short self.panic_vol_long = panic_vol_long self.panic_vol_ratio = panic_vol_ratio self.panic_peak_drop_pct = panic_peak_drop_pct self.panic_peak_window = panic_peak_window self.w_trend = w_trend self.w_dd = w_dd self.w_peak = w_peak self.w_downvol = w_downvol self.leveraged_pair = leveraged_pair or dict(self.DEFAULT_LEVERAGED_PAIR) self.core_pair = core_pair or dict(self.DEFAULT_CORE_PAIR) # ---- Conviction features ---- @staticmethod def _clip01(x: float) -> float: if not np.isfinite(x): return 0.0 return float(min(1.0, max(0.0, x))) def _panic_demote(self, sig_closes: np.ndarray) -> bool: """Detect fast vol regime / sharp peak velocity → panic demote tier 2→0.""" if sig_closes.size < self.panic_vol_long + 1: return False # Short vs long realized vol rets = np.diff(sig_closes[-(self.panic_vol_long + 1):]) / np.maximum( sig_closes[-(self.panic_vol_long + 1):-1], 1e-12 ) if rets.size < self.panic_vol_long: return False long_vol = float(rets.std(ddof=1)) short_rets = rets[-self.panic_vol_short:] short_vol = float(short_rets.std(ddof=1)) if short_rets.size > 1 else 0.0 if long_vol > 0 and short_vol / long_vol >= self.panic_vol_ratio: return True # Peak-velocity: drop > X% in last N days from rolling peak window = sig_closes[-self.panic_peak_window:] if window.size >= 2: peak = float(window.max()) drop = (peak - float(sig_closes[-1])) / max(peak, 1e-12) if drop >= self.panic_peak_drop_pct: return True return False def _conviction(self, sig_closes: np.ndarray) -> float: """Directional conviction in [0, 1] — higher means cleaner trend.""" n = sig_closes.size if n < max(self.ma_long, self.trend_lookback, self.downvol_lookback + self.downvol_window) + 1: return 0.0 last = float(sig_closes[-1]) # 1) Trend score: distance above MA200 in vol-units ma_long = float(sig_closes[-self.ma_long:].mean()) rets = np.diff(sig_closes[-self.downvol_window - 1:]) / np.maximum( sig_closes[-self.downvol_window - 1:-1], 1e-12 ) ann_vol = float(rets.std(ddof=1) * np.sqrt(252)) if rets.size > 1 else 0.20 ann_vol = max(ann_vol, 1e-3) trend_units = (last / ma_long - 1.0) / ann_vol # vol-units (annualized) trend_score = self._clip01(trend_units / 0.50) # ~0.50 vol-unit = strong # 2) Drawdown score: shallower = better dd_window_arr = sig_closes[-self.dd_window:] dd = float(last / dd_window_arr.max() - 1.0) # ≤ 0 period_vol = ann_vol / np.sqrt(252) * np.sqrt(self.dd_window) dd_units = -dd / max(period_vol, 1e-4) dd_score = self._clip01(1.0 - dd_units / 2.5) # 2.5 vol-units → 0 # 3) Peak-distance score peak_arr = sig_closes[-self.peak_window:] peak_ratio = float(last / peak_arr.max()) peak_period_vol = ann_vol / np.sqrt(252) * np.sqrt(self.peak_window) peak_drop_units = (1.0 - peak_ratio) / max(peak_period_vol, 1e-4) peak_score = self._clip01(1.0 - peak_drop_units / 2.0) # 4) Downside-vol percentile (lower = better) full_rets = np.diff(sig_closes[-(self.downvol_lookback + self.downvol_window):]) / np.maximum( sig_closes[-(self.downvol_lookback + self.downvol_window):-1], 1e-12 ) # Rolling downside semideviation s = pd.Series(full_rets) downside = s.where(s < 0, 0.0) dv_series = downside.rolling(self.downvol_window).std(ddof=1) * np.sqrt(252) dv_now = float(dv_series.iloc[-1]) if not dv_series.empty else np.nan dv_history = dv_series.dropna().to_numpy() if dv_history.size == 0 or not np.isfinite(dv_now): downvol_score = 0.5 else: pct = float((dv_history < dv_now).mean()) downvol_score = 1.0 - pct # low downvol → high score score = ( self.w_trend * trend_score + self.w_dd * dd_score + self.w_peak * peak_score + self.w_downvol * downvol_score ) total_w = self.w_trend + self.w_dd + self.w_peak + self.w_downvol return float(score / max(total_w, 1e-9)) # ---- Tier state ---- def _tier_for(self, conviction: float, current: int, pending_promote: int, pending_demote: int) -> tuple[int, int, int]: """Update tier given conviction. Returns (new_tier, new_pp, new_pd).""" new_tier = current # Demote first (safety > greed) if current >= 1 and conviction <= self.demote_thresholds[current - 1]: pending_demote += 1 pending_promote = 0 if pending_demote >= self.demote_confirm: new_tier = max(0, current - 1) pending_demote = 0 return new_tier, pending_promote, pending_demote else: pending_demote = 0 # Promote target = current if current < 2 and conviction >= self.promote_thresholds[current]: pending_promote += 1 if pending_promote >= self.promote_confirm: target = min(2, current + 1) pending_promote = 0 else: pending_promote = 0 return target, pending_promote, pending_demote def _equity_blend(self, sym: str, tier: int, cols: list[str]) -> dict[str, float]: """Blend a chosen symbol with its leveraged/core counterpart by tier.""" # If V3 picked a leveraged sym (TQQQ/UPRO), map to core counterpart if sym in self.core_pair: lev_sym = sym core_sym = self.core_pair[sym] elif sym in self.leveraged_pair: core_sym = sym lev_sym = self.leveraged_pair[sym] else: # No leveraged variant available → 100% as-is return {sym: 1.0} if core_sym not in cols and lev_sym not in cols: return {sym: 1.0} if core_sym not in cols: return {lev_sym: 1.0} if lev_sym not in cols: return {core_sym: 1.0} if tier == 0: return {core_sym: 1.0} if tier == 1: return {core_sym: 0.5, lev_sym: 0.5} return {lev_sym: 1.0} # ---- Override: post-process V3 weights ---- def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: # 1) Get V3's PIT-safe weights (already shifted) v3_w = super().generate_signals(data) # We need to "un-shift" V3 weights to align with the day they were decided, # apply tier blending in that frame, then re-shift. Easier: work directly # in the signal frame (which is v3_w's index, with row t = position for t). # Since super() already shifted by 1, v3_w.iloc[t] is the *position* held # on day t (decided on close of t-1). We modulate row-by-row. sig = data[self.signal] if self.signal in data.columns else None if sig is None: return v3_w sig_arr = sig.to_numpy() cols = list(v3_w.columns) # Make sure leveraged/core counterparts exist as columns; expand if not extra_cols = [] for sym in (*self.core_pair.keys(), *self.leveraged_pair.keys()): if sym in data.columns and sym not in cols: extra_cols.append(sym) if extra_cols: for c in extra_cols: v3_w[c] = 0.0 cols = list(v3_w.columns) out = pd.DataFrame(0.0, index=v3_w.index, columns=cols) # Tier state tier = 0 # start at 0 — promotions happen via confirm pending_promote = 0 pending_demote = 0 tier_age = 0 prev_active_sym: str | None = None first_risk_on_seen = False for t in range(len(v3_w)): row = v3_w.iloc[t] active = row[row > 0] if active.empty: # No position → no modulation tier = 0 pending_promote = pending_demote = 0 tier_age = 0 prev_active_sym = None continue sym = active.idxmax() # V3 outputs 100% to one symbol # Compute conviction from signal closes through t-1 (already PIT) # v3_w.iloc[t] reflects position decided on close(t-1), so we can # use sig_arr[:t] as available info. sig_closes = sig_arr[: t] if sig_closes.size == 0: continue conviction = self._conviction(sig_closes) # Detect new active position is_equity = sym in self.core_pair or sym in self.leveraged_pair if not is_equity: # Risk-off: pass through, reset tier state tier = 0 pending_promote = pending_demote = 0 tier_age = 0 prev_active_sym = sym out.iloc[t] = row continue if prev_active_sym != sym: # Fresh entry into equity sleeve if not first_risk_on_seen: tier = self.starting_tier first_risk_on_seen = True else: # Initialize tier from current conviction if conviction >= self.promote_thresholds[1]: tier = 2 elif conviction >= self.promote_thresholds[0]: tier = 1 else: tier = 0 pending_promote = pending_demote = 0 tier_age = 0 # Panic demote — bypasses min-hold and conviction logic panic = self._panic_demote(sig_closes) if panic and tier > 0: tier = 0 tier_age = 0 pending_promote = pending_demote = 0 else: # Tier transition logic with min-hold new_tier = tier if tier_age >= self.tier_min_hold: new_tier, pending_promote, pending_demote = self._tier_for( conviction, tier, pending_promote, pending_demote ) if new_tier != tier: tier_age = 0 tier = new_tier else: tier_age += 1 else: tier_age += 1 # Even within min-hold, allow emergency demote if conviction crashes if tier > 0 and conviction <= self.demote_thresholds[tier - 1] * 0.6: tier = max(0, tier - 1) tier_age = 0 pending_promote = pending_demote = 0 # Blend blend = self._equity_blend(sym, tier, cols) for s, ww in blend.items(): out.at[v3_w.index[t], s] = ww prev_active_sym = sym return out __all__ = ["TrendRiderV5"]