research: extensive V7 optimization and V8 (TMF) evaluation

Research scripts exploring paths beyond V7+VT36:
- regime_stock_picker_eval: V3 regime + S&P 500 stock picking
- v7_parameter_sweep: VT range (20-48%) + adaptive PT variants
- v7_synthetic_leverage_eval: synthetic 2x/3x leveraged individual stocks
- v7_breakthrough_eval/fixed: ensemble, cross-market, alt regime engines
- v7_three_ideas_eval: TMF risk-off, PT entry reset, fast exit
- v7_trade_audit: full 10y trade log and alpha attribution
- sota_ranking: comprehensive cross-strategy ranking

Key findings:
- VT36 is optimal risk-return tradeoff (+7% vs VT28, Sharpe ~flat)
- PT30 is structural optimum for 3x ETFs (all adaptive variants worse)
- V8 (TMF risk-off) debunked: +5% was 1-day lookahead bias artifact
- V3 regime engine irreplaceable (all simplified alternatives fail)
- PT mechanism is dominant alpha source (+15.6pp ann, +0.58 Sharpe)

V8 strategy file kept for reference (not registered).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-21 20:57:34 +08:00
parent b8bac26b8f
commit 1f50253d13
9 changed files with 3370 additions and 0 deletions

View File

@@ -0,0 +1,366 @@
"""Does V3 regime timing + S&P 500 stock picking improve over either alone?
Variants tested:
1. RegimeStockPicker top-10 — V3 regime, risk-on = top-10 momentum stocks
2. RegimeStockPicker top-20 — V3 regime, risk-on = top-20 momentum stocks
3. RegimeRecovery top-10 — V3 regime, risk-on = recovery+momentum top-10
4. RecoveryMomentum top-10 — pure stock picker, no regime filter (baseline)
5. TrendRider V7 — leveraged ETFs (current SOTA)
6. SPY buy-and-hold — benchmark
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
import universe_history as uh
from main import backtest
from strategies.base import Strategy
from strategies.permanent import TrendRiderV3
from strategies.recovery_momentum import RecoveryMomentumStrategy
from strategies.trend_rider_v7 import TrendRiderV7
from universe import UNIVERSES
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
# ---------------------------------------------------------------------------
# Strategy: V3 regime gate + cross-sectional momentum on S&P 500
# ---------------------------------------------------------------------------
class RegimeStockPicker(Strategy):
"""V3 macro regime + S&P 500 momentum stock selection.
Risk-on: equal-weight top-N stocks by ``mom_lookback``-day momentum.
Risk-off: momentum leader of (GLD, DBC).
"""
def __init__(
self,
stock_tickers: list[str],
top_n: int = 10,
signal: str = "SPY",
defensive: tuple[str, ...] = ("GLD", "DBC"),
ma_long: int = 150,
mom_lookback: int = 63,
rebal_every: int = 21,
):
self.stock_tickers = stock_tickers
self.top_n = top_n
self.signal = signal
self.defensive = defensive
self.ma_long = ma_long
self.mom_lookback = mom_lookback
self.rebal_every = rebal_every
self._v3 = TrendRiderV3(
signal=signal, risk_on=("TQQQ", "UPRO"), risk_off=defensive,
ma_long=ma_long,
)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
w = pd.DataFrame(np.nan, index=data.index, columns=data.columns)
if self.signal not in data.columns:
return w.fillna(0.0)
sig_arr = data[self.signal].to_numpy()
mom = data.pct_change(self.mom_lookback, fill_method=None)
avail_stocks = [t for t in self.stock_tickers if t in data.columns]
avail_def = [t for t in self.defensive if t in data.columns]
need = max(self.ma_long, self.mom_lookback + 1,
self._v3.vol_window + 1, self._v3.dd_window,
self._v3.peak_window) + 1
regime: str | None = None
bars = 0
for i in range(len(data)):
if i < need:
continue
closes = sig_arr[:i]
if np.isnan(closes[-1]):
continue
desired = self._v3._desired_regime(closes, regime)
changed = False
if regime is None:
regime, bars, changed = desired, 0, True
else:
bars += 1
if desired != regime and bars >= 15:
regime, bars, changed = desired, 0, True
if not changed and (i - need) % self.rebal_every != 0:
continue
row = {c: 0.0 for c in data.columns}
dt = data.index[i]
if regime == "risk_on":
m = mom.iloc[i][avail_stocks].dropna()
valid = m.index[data.loc[dt, m.index].notna()]
m = m[valid]
m = m[m > 0]
top = m.nlargest(min(self.top_n, len(m)))
if len(top) > 0:
wt = 1.0 / len(top)
for t in top.index:
row[t] = wt
elif avail_def:
row[avail_def[0]] = 1.0
else:
if avail_def:
dm = mom.iloc[i][avail_def].dropna()
best = dm.idxmax() if len(dm) > 0 else avail_def[0]
row[best] = 1.0
for c, v in row.items():
w.at[dt, c] = v
w = w.ffill().fillna(0.0)
return w.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Strategy: V3 regime gate + recovery-momentum composite on S&P 500
# ---------------------------------------------------------------------------
class RegimeRecoveryPicker(Strategy):
"""V3 regime + recovery-momentum composite for stock selection.
Uses the same factor as RecoveryMomentumStrategy but only during risk-on.
"""
def __init__(
self,
stock_tickers: list[str],
top_n: int = 10,
signal: str = "SPY",
defensive: tuple[str, ...] = ("GLD", "DBC"),
ma_long: int = 150,
recovery_window: int = 63,
mom_lookback: int = 252,
mom_skip: int = 21,
rebal_every: int = 21,
):
self.stock_tickers = stock_tickers
self.top_n = top_n
self.signal = signal
self.defensive = defensive
self.ma_long = ma_long
self.recovery_window = recovery_window
self.mom_lookback = mom_lookback
self.mom_skip = mom_skip
self.rebal_every = rebal_every
self._v3 = TrendRiderV3(
signal=signal, risk_on=("TQQQ", "UPRO"), risk_off=defensive,
ma_long=ma_long,
)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
w = pd.DataFrame(np.nan, index=data.index, columns=data.columns)
if self.signal not in data.columns:
return w.fillna(0.0)
sig_arr = data[self.signal].to_numpy()
stock_data = data[[t for t in self.stock_tickers if t in data.columns]]
recovery = stock_data / stock_data.rolling(self.recovery_window).min() - 1
momentum = stock_data.shift(self.mom_skip).pct_change(
self.mom_lookback - self.mom_skip, fill_method=None,
)
rec_rank = recovery.rank(axis=1, pct=True, na_option="keep")
mom_rank = momentum.rank(axis=1, pct=True, na_option="keep")
composite = 0.5 * rec_rank + 0.5 * mom_rank
stock_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
def_mom = data[[t for t in self.defensive if t in data.columns]].pct_change(63, fill_method=None)
avail_def = [t for t in self.defensive if t in data.columns]
need = max(self.ma_long, self.mom_lookback + 1,
self._v3.vol_window + 1, self._v3.dd_window,
self._v3.peak_window, self.recovery_window) + 1
regime: str | None = None
bars = 0
for i in range(len(data)):
if i < need:
continue
closes = sig_arr[:i]
if np.isnan(closes[-1]):
continue
desired = self._v3._desired_regime(closes, regime)
changed = False
if regime is None:
regime, bars, changed = desired, 0, True
else:
bars += 1
if desired != regime and bars >= 15:
regime, bars, changed = desired, 0, True
if not changed and (i - need) % self.rebal_every != 0:
continue
row = {c: 0.0 for c in data.columns}
dt = data.index[i]
if regime == "risk_on":
ranks_i = stock_rank.iloc[i]
n_valid = composite.iloc[i].notna().sum()
if n_valid >= self.top_n:
top = ranks_i[ranks_i <= self.top_n].index
if len(top) > 0:
wt = 1.0 / len(top)
for t in top:
row[t] = wt
if sum(row.values()) < 0.01 and avail_def:
row[avail_def[0]] = 1.0
else:
if avail_def:
dm = def_mom.iloc[i][avail_def].dropna()
best = dm.idxmax() if len(dm) > 0 else avail_def[0]
row[best] = 1.0
for c, v in row.items():
w.at[dt, c] = v
w = w.ffill().fillna(0.0)
return w.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print("=" * 80)
print(" REGIME + STOCK PICKER EVALUATION")
print("=" * 80)
# --- Load S&P 500 data (PIT-safe) ---
print("\n[1/3] Loading S&P 500 universe (PIT)...")
universe = UNIVERSES["us"]
tickers = universe["fetch"]()
pit_intervals = uh.load_sp500_history()
hist_tickers = uh.all_tickers_ever(pit_intervals)
etf_extra = ["SPY", "GLD", "DBC", "SHY", "TQQQ", "UPRO", "TLT"]
all_tickers = sorted(set(tickers + hist_tickers + etf_extra))
print(f" Downloading {len(all_tickers)} tickers...")
data = data_manager.update("us", all_tickers, with_open=False)
if isinstance(data, tuple):
data = data[0]
cutoff = data.index[-1] - pd.DateOffset(years=YEARS)
data = data[data.index >= cutoff]
data = uh.mask_prices(data, pit_intervals)
stock_tickers = [
t for t in data.columns
if t not in etf_extra and data[t].notna().any()
]
print(f" Period: {data.index[0].date()}{data.index[-1].date()}")
print(f" Tradable stocks: {len(stock_tickers)}")
# --- Run all strategies ---
print("\n[2/3] Running strategies...")
results: dict[str, pd.Series] = {}
def run(name, strategy, price_data):
print(f" {name}...")
eq = backtest(strategy, price_data, initial_capital=CAPITAL,
transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
results[name] = eq
# 1-2. Regime + momentum top-N
for n in (10, 20):
run(f"Regime+Mom Top{n}",
RegimeStockPicker(stock_tickers=stock_tickers, top_n=n),
data)
# 3. Regime + recovery-momentum top-10
run("Regime+RecMom Top10",
RegimeRecoveryPicker(stock_tickers=stock_tickers, top_n=10),
data)
# 4. Regime + recovery-momentum top-20
run("Regime+RecMom Top20",
RegimeRecoveryPicker(stock_tickers=stock_tickers, top_n=20),
data)
# 5. Pure recovery momentum (no regime) — baseline
run("RecoveryMom Top10 (pure)",
RecoveryMomentumStrategy(top_n=10),
data[stock_tickers])
run("RecoveryMom Top20 (pure)",
RecoveryMomentumStrategy(top_n=20),
data[stock_tickers])
# 6. TrendRider V7 (leveraged ETFs)
etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"]
if t in data.columns]
run("TrendRider V7 (3x ETFs)", TrendRiderV7(), data[etf_cols])
# 7. SPY benchmark
spy = data["SPY"].dropna()
results["SPY (benchmark)"] = (spy / spy.iloc[0]) * CAPITAL
# --- Report ---
print(f"\n[3/3] Results ({YEARS}y, ${CAPITAL:,.0f}, tx={TX_COST*100:.1f}bps + ${FIXED_FEE:.0f}/trade)")
print("=" * 100)
hdr = (f"{'Strategy':<30} {'Ann%':>8} {'Vol%':>8} {'Sharpe':>8} "
f"{'Sortino':>8} {'MaxDD%':>8} {'Calmar':>8} {'WinRate':>8}")
print(hdr)
print("-" * 100)
for name, eq in results.items():
m = metrics.raw_summary(eq)
print(f"{name:<30} {m['annualizedReturn']*100:>7.1f}% "
f"{m['annualizedVolatility']*100:>7.1f}% "
f"{m['sharpeRatio']:>8.2f} {m['sortinoRatio']:>8.2f} "
f"{m['maxDrawdown']*100:>7.1f}% {m['calmarRatio']:>8.2f} "
f"{m['winRate']*100:>7.1f}%")
print("=" * 100)
# Yearly breakdown for top strategies
print("\n--- Yearly Returns ---")
yearly: dict[str, dict[str, float]] = {}
for name, eq in results.items():
yr = {}
for year, grp in eq.groupby(eq.index.year):
if len(grp) >= 2:
yr[str(year)] = grp.iloc[-1] / grp.iloc[0] - 1
yearly[name] = yr
all_years = sorted(set(y for d in yearly.values() for y in d))
header = f"{'Year':<6}" + "".join(f"{name[:20]:>22}" for name in results)
print(header)
print("-" * len(header))
for year in all_years:
cols = []
for name in results:
r = yearly[name].get(year)
cols.append(f"{r*100:>20.1f}%" if r is not None else f"{'':>21}")
print(f"{year:<6}" + "".join(cols))
if __name__ == "__main__":
main()

166
research/sota_ranking.py Normal file
View File

@@ -0,0 +1,166 @@
"""Rank all top strategies head-to-head on the same 10-year PIT-safe data."""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
import universe_history as uh
from main import backtest
from trader import STRATEGY_REGISTRY, ETF_STRATEGY_UNIVERSES, MIXED_STRATEGY_EXTRA_TICKERS, filter_tradable_tickers
from universe import UNIVERSES
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
# Only the most promising strategies — skip redundant freq variants
CANDIDATES = [
# ETF tactical allocation
"trend_rider_v7",
"trend_rider_v7_vt24",
"trend_rider_v7_vt32",
"trend_rider_v3_vt28",
"trend_rider_v3_vt32",
"trend_rider_v5_us",
"trend_rider_v5_panic",
"trend_rider_v3_us",
# V6 hybrids (stock + regime)
"trend_rider_v6",
"trend_rider_v6_top10",
# Stock pickers
"recovery_mom_top10",
"recovery_mom_top20",
"trend_following",
"fc_rec_mfilt_deep_upvol_monthly",
"fc_rec_mfilt_deep_upvol_daily",
# Ensembles
"ensemble_alpha_top10",
"sharpe_boosted_ensemble_top8",
"risk_managed_ensemble_top10",
"enhanced_factor_combo_top10",
]
def main():
print("=" * 95)
print(" COMPREHENSIVE STRATEGY RANKING (10y PIT-safe)")
print("=" * 95)
# Load S&P 500 + PIT
print("\n[1] Loading data...")
universe = UNIVERSES["us"]
tickers = universe["fetch"]()
pit_intervals = uh.load_sp500_history()
hist_tickers = uh.all_tickers_ever(pit_intervals)
# Collect all ETF tickers needed
all_etf = set()
for name in CANDIDATES:
base = name.removeprefix("sim_")
if base in ETF_STRATEGY_UNIVERSES:
all_etf.update(ETF_STRATEGY_UNIVERSES[base])
if base in MIXED_STRATEGY_EXTRA_TICKERS:
all_etf.update(MIXED_STRATEGY_EXTRA_TICKERS[base])
all_etf.update(["SPY", "GLD", "DBC", "SHY", "TQQQ", "UPRO", "TLT", "IEF"])
all_tickers = sorted(set(tickers + hist_tickers + list(all_etf)))
print(f" {len(all_tickers)} tickers to download...")
stock_data = data_manager.update("us", all_tickers, with_open=False)
if isinstance(stock_data, tuple):
stock_data = stock_data[0]
cutoff = stock_data.index[-1] - pd.DateOffset(years=YEARS)
stock_data = stock_data[stock_data.index >= cutoff]
stock_data = uh.mask_prices(stock_data, pit_intervals)
stock_tickers = [t for t in stock_data.columns
if t not in all_etf and stock_data[t].notna().any()]
# Also load pure ETF panel (for pure-ETF strategies that use separate data)
etf_list = sorted(all_etf)
etf_data = data_manager.update("etfs", etf_list, with_open=False)
if isinstance(etf_data, tuple):
etf_data = etf_data[0]
etf_cutoff = etf_data.index[-1] - pd.DateOffset(years=YEARS)
etf_data = etf_data[etf_data.index >= etf_cutoff]
print(f" Stocks: {len(stock_tickers)}, ETFs: {len(etf_list)}")
print(f" Period: {stock_data.index[0].date()}{stock_data.index[-1].date()}")
# Run strategies
print("\n[2] Running strategies...")
results: list[tuple[str, dict]] = []
for name in CANDIDATES:
if name not in STRATEGY_REGISTRY:
print(f" SKIP {name} (not in registry)")
continue
base = name.removeprefix("sim_")
print(f" {name}...", end=" ", flush=True)
try:
if base in ETF_STRATEGY_UNIVERSES:
# Pure ETF strategy
etf_tickers = ETF_STRATEGY_UNIVERSES[base]
tradable = [t for t in etf_tickers if t in etf_data.columns]
strategy = STRATEGY_REGISTRY[name]()
eq = backtest(strategy, etf_data[tradable],
initial_capital=CAPITAL, transaction_cost=TX_COST,
fixed_fee=FIXED_FEE)
elif base in MIXED_STRATEGY_EXTRA_TICKERS:
# Mixed: stocks + ETFs in one panel
extra = MIXED_STRATEGY_EXTRA_TICKERS[base]
panel_cols = stock_tickers + [t for t in extra if t in stock_data.columns]
panel = stock_data[[c for c in panel_cols if c in stock_data.columns]]
strategy = STRATEGY_REGISTRY[name]()
eq = backtest(strategy, panel,
initial_capital=CAPITAL, transaction_cost=TX_COST,
fixed_fee=FIXED_FEE)
else:
# Pure stock strategy
strategy = STRATEGY_REGISTRY[name](top_n=10)
eq = backtest(strategy, stock_data[stock_tickers],
initial_capital=CAPITAL, transaction_cost=TX_COST,
fixed_fee=FIXED_FEE)
m = metrics.raw_summary(eq)
results.append((name, m))
print(f"Ann={m['annualizedReturn']*100:.1f}%")
except Exception as e:
print(f"FAILED: {e}")
# SPY benchmark
spy = stock_data["SPY"].dropna()
spy_eq = (spy / spy.iloc[0]) * CAPITAL
spy_m = metrics.raw_summary(spy_eq)
results.append(("SPY (benchmark)", spy_m))
# Sort by annualized return
results.sort(key=lambda x: x[1]["annualizedReturn"], reverse=True)
print(f"\n[3] Ranking ({YEARS}y, ${CAPITAL:,.0f}, tx={TX_COST*100:.1f}bps + ${FIXED_FEE:.0f}/trade)")
print("=" * 110)
print(f"{'#':<4} {'Strategy':<40} {'Ann%':>8} {'Vol%':>8} {'Sharpe':>8} {'Sortino':>8} {'MaxDD%':>8} {'Calmar':>8}")
print("-" * 110)
for i, (name, m) in enumerate(results, 1):
print(f"{i:<4} {name:<40} "
f"{m['annualizedReturn']*100:>7.1f}% "
f"{m['annualizedVolatility']*100:>7.1f}% "
f"{m['sharpeRatio']:>8.2f} "
f"{m['sortinoRatio']:>8.2f} "
f"{m['maxDrawdown']*100:>7.1f}% "
f"{m['calmarRatio']:>8.2f}")
print("=" * 110)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,612 @@
"""Three directions to break V7+VT36's ceiling (61% Ann, Sharpe 1.89).
Direction A — Multi-strategy ensemble: V7 + stock pickers, capital split.
Direction B — Cross-market V7: sector 3x ETFs (SOXL, TECL, TNA, FAS).
Direction C — Improved regime engine: alternative signals replacing MA150.
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
import universe_history as uh
from main import backtest
from strategies.base import Strategy
from strategies.permanent import TrendRiderV3
from strategies.trend_rider_v7 import TrendRiderV7
from strategies.recovery_momentum import RecoveryMomentumStrategy
from universe import UNIVERSES
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
def run_and_report(label, strategy, data_panel, capital=CAPITAL):
eq = backtest(strategy, data_panel, initial_capital=capital,
transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m = metrics.raw_summary(eq)
return label, eq, m
def print_table(results: list[tuple[str, pd.Series, dict]]):
print(f"{'#':<4} {'Strategy':<52} {'Ann%':>7} {'Vol%':>7} {'Sharpe':>7} "
f"{'Sortino':>8} {'MaxDD%':>7} {'Calmar':>7}")
print("-" * 115)
for i, (label, _, m) in enumerate(results, 1):
marker = "" if i <= 3 else ""
print(f"{i:<4} {label:<52} "
f"{m['annualizedReturn']*100:>6.1f}% "
f"{m['annualizedVolatility']*100:>6.1f}% "
f"{m['sharpeRatio']:>7.2f} "
f"{m['sortinoRatio']:>8.2f} "
f"{m['maxDrawdown']*100:>6.1f}% "
f"{m['calmarRatio']:>7.2f}{marker}")
def ensemble_equity(equities: list[pd.Series], weights: list[float] | None = None
) -> pd.Series:
"""Combine independent equity curves with periodic rebalancing.
Each equity is assumed to start at $CAPITAL.
Returns combined equity as if capital were split according to weights.
"""
if weights is None:
weights = [1.0 / len(equities)] * len(equities)
idx = equities[0].index
for eq in equities[1:]:
idx = idx.intersection(eq.index)
aligned = [eq.reindex(idx).ffill() for eq in equities]
# Combine as weighted sum of normalized curves
combined = pd.Series(0.0, index=idx)
for eq, w in zip(aligned, weights):
combined += (eq / eq.iloc[0]) * w
combined = combined * CAPITAL
return combined
# =========================================================================
# Data loading (shared)
# =========================================================================
def load_all_data():
print("=" * 100)
print(" LOADING ALL DATA")
print("=" * 100)
# S&P 500 + PIT
universe = UNIVERSES["us"]
tickers = universe["fetch"]()
pit_intervals = uh.load_sp500_history()
hist_tickers = uh.all_tickers_ever(pit_intervals)
# All ETFs needed across all three directions
core_etfs = ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY", "TLT"]
sector_etfs = [
"SOXL", "SMH", # 3x semi / semi index
"TECL", "XLK", # 3x tech / tech sector
"TNA", "IWM", # 3x Russell 2000
"FAS", "XLF", # 3x financials
]
regime_etfs = ["VIX", "^VIX"] # VIX for alt regime signals
all_etfs = sorted(set(core_etfs + sector_etfs + regime_etfs))
# Stock data (includes ETFs for mixed strategies)
all_stock_tickers = sorted(set(tickers + hist_tickers + all_etfs))
print(f"\nDownloading {len(all_stock_tickers)} tickers...")
stock_data = data_manager.update("us", all_stock_tickers, with_open=False)
if isinstance(stock_data, tuple):
stock_data = stock_data[0]
cutoff = stock_data.index[-1] - pd.DateOffset(years=YEARS)
stock_data = stock_data[stock_data.index >= cutoff]
stock_data = uh.mask_prices(stock_data, pit_intervals)
# Pure ETF data
etf_data = data_manager.update("etfs", all_etfs, with_open=False)
if isinstance(etf_data, tuple):
etf_data = etf_data[0]
etf_cutoff = etf_data.index[-1] - pd.DateOffset(years=YEARS)
etf_data = etf_data[etf_data.index >= etf_cutoff]
stock_tickers = [t for t in stock_data.columns
if t not in all_etfs and stock_data[t].notna().any()]
print(f"Stocks: {len(stock_tickers)}")
print(f"Period: {stock_data.index[0].date()}{stock_data.index[-1].date()}")
print(f"ETF columns: {sorted(etf_data.columns.tolist())}")
return stock_data, etf_data, stock_tickers, all_etfs
# =========================================================================
# DIRECTION A: Multi-strategy ensemble
# =========================================================================
def direction_a(stock_data, etf_data, stock_tickers, all_etfs):
print("\n" + "=" * 100)
print(" DIRECTION A: MULTI-STRATEGY ENSEMBLE")
print("=" * 100)
results = []
# Baselines
etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns]
label, eq_v7, m = run_and_report(
"V7+VT36 (baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[etf_cols])
results.append((label, eq_v7, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}")
label, eq_rec, m = run_and_report(
"RecoveryMom Top10 (baseline)", RecoveryMomentumStrategy(top_n=10), stock_data[stock_tickers])
results.append((label, eq_rec, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}")
# Ensembles with different splits
for v7_pct in (0.5, 0.6, 0.7, 0.8):
stock_pct = 1.0 - v7_pct
label = f"Ensemble {int(v7_pct*100)}% V7 + {int(stock_pct*100)}% RecMom"
eq = ensemble_equity([eq_v7, eq_rec], [v7_pct, stock_pct])
m = metrics.raw_summary(eq)
results.append((label, eq, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
# Also try V7+VT36 + V7+VT24 (low-vol variant) ensemble
label, eq_v7_lo, m = run_and_report(
"V7+VT24 (low-vol)", TrendRiderV7(target_vol=0.24, min_lev=0.50), etf_data[etf_cols])
results.append((label, eq_v7_lo, m))
eq_v7_duo = ensemble_equity([eq_v7, eq_v7_lo], [0.6, 0.4])
m = metrics.raw_summary(eq_v7_duo)
results.append(("Ensemble 60% V7-VT36 + 40% V7-VT24", eq_v7_duo, m))
print(f" V7-VT36/VT24 blend: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True)
print(f"\n--- Direction A Results (sorted by Sharpe) ---")
print_table(results)
return results
# =========================================================================
# DIRECTION B: Cross-market V7 (sector 3x ETFs)
# =========================================================================
def direction_b(stock_data, etf_data, stock_tickers, all_etfs):
print("\n" + "=" * 100)
print(" DIRECTION B: CROSS-MARKET V7 (SECTOR 3x ETFs)")
print("=" * 100)
results = []
# Baseline
etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns]
label, eq_v7, m = run_and_report(
"V7+VT36 SPY→TQQQ/UPRO (baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[etf_cols])
results.append((label, eq_v7, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%")
# Sector V7 instances
sector_configs = [
("SMH→SOXL (Semiconductor)", "SMH", ("SOXL",)),
("XLK→TECL (Technology)", "XLK", ("TECL",)),
("IWM→TNA (Russell 2000)", "IWM", ("TNA",)),
("XLF→FAS (Financials)", "XLF", ("FAS",)),
]
sector_equities = {}
for desc, signal, risk_on in sector_configs:
needed = [signal] + list(risk_on) + ["GLD", "DBC", "SHY"]
available = [t for t in needed if t in etf_data.columns]
if signal not in available or not any(r in available for r in risk_on):
print(f" SKIP {desc}: missing data ({signal} or {risk_on})")
continue
risk_on_avail = tuple(r for r in risk_on if r in available)
strategy = TrendRiderV7(
signal=signal, risk_on=risk_on_avail, risk_off=("GLD", "DBC"),
target_vol=0.36, min_lev=0.75,
)
label = f"V7+VT36 {desc}"
try:
_, eq, m = run_and_report(label, strategy, etf_data[available])
results.append((label, eq, m))
sector_equities[desc] = eq
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
except Exception as e:
print(f" FAILED {label}: {e}")
# Cross-market ensembles
if sector_equities:
# All sectors + SPY equal weight
all_eqs = [eq_v7] + list(sector_equities.values())
eq_all = ensemble_equity(all_eqs)
m = metrics.raw_summary(eq_all)
label = f"Equal-weight all {len(all_eqs)} V7 instances"
results.append((label, eq_all, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
# Best 2-3 combinations
if "SMH→SOXL (Semiconductor)" in sector_equities:
eq_spy_semi = ensemble_equity([eq_v7, sector_equities["SMH→SOXL (Semiconductor)"]], [0.5, 0.5])
m = metrics.raw_summary(eq_spy_semi)
results.append(("50% SPY-V7 + 50% SOXL-V7", eq_spy_semi, m))
print(f" SPY+SOXL combo: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
eq_spy_semi_70 = ensemble_equity([eq_v7, sector_equities["SMH→SOXL (Semiconductor)"]], [0.7, 0.3])
m = metrics.raw_summary(eq_spy_semi_70)
results.append(("70% SPY-V7 + 30% SOXL-V7", eq_spy_semi_70, m))
if "XLK→TECL (Technology)" in sector_equities:
eq_spy_tech = ensemble_equity([eq_v7, sector_equities["XLK→TECL (Technology)"]], [0.5, 0.5])
m = metrics.raw_summary(eq_spy_tech)
results.append(("50% SPY-V7 + 50% TECL-V7", eq_spy_tech, m))
if len(sector_equities) >= 2:
# SPY + top 2 sectors
sorted_sectors = sorted(sector_equities.items(),
key=lambda x: metrics.raw_summary(x[1])["sharpeRatio"],
reverse=True)
top2 = sorted_sectors[:2]
eq_best3 = ensemble_equity([eq_v7] + [eq for _, eq in top2],
[0.5] + [0.25] * 2)
m = metrics.raw_summary(eq_best3)
label = f"50% SPY-V7 + 25% {top2[0][0][:4]}.. + 25% {top2[1][0][:4]}.."
results.append((label, eq_best3, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True)
print(f"\n--- Direction B Results (sorted by Sharpe) ---")
print_table(results)
return results
# =========================================================================
# DIRECTION C: Improved regime engine
# =========================================================================
class V7AltRegime(Strategy):
"""V7 with pluggable regime function replacing V3._desired_regime."""
def __init__(
self,
regime_func,
signal: str = "SPY",
risk_on: tuple[str, ...] = ("TQQQ", "UPRO"),
risk_off: tuple[str, ...] = ("GLD", "DBC"),
target_vol: float = 0.36,
vol_window: int = 60,
min_lev: float = 0.75,
max_lev: float = 1.0,
pt_threshold: float = 0.30,
pt_band: float = 0.10,
pt_park: str = "SHY",
ma_long: int = 150,
mom_lookback: int = 63,
min_hold: int = 15,
):
self.regime_func = regime_func
self.signal = signal
self.risk_on = risk_on
self.risk_off = risk_off
self.target_vol = target_vol
self.vol_window = vol_window
self.min_lev = min_lev
self.max_lev = max_lev
self.pt_threshold = pt_threshold
self.pt_band = pt_band
self.pt_park = pt_park
self.mom_lookback = mom_lookback
self.min_hold = min_hold
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
cols = list({self.signal, *self.risk_on, *self.risk_off, self.pt_park})
cols = [c for c in cols if c in data.columns]
w = pd.DataFrame(0.0, index=data.index, columns=cols)
if self.signal not in data.columns:
return w
sig_arr = data[self.signal].to_numpy()
sym_arrays = {s: data[s].to_numpy() for s in cols if s in data.columns}
ron_syms = [s for s in self.risk_on if s in data.columns]
roff_syms = [s for s in self.risk_off if s in data.columns]
need = 252
regime: str | None = None
bars = 0
def pick_best(basket, t):
best_s, best_r = None, -np.inf
for s in basket:
arr = sym_arrays.get(s)
if arr is None or t < self.mom_lookback + 1:
continue
if arr[t-1] <= 0 or np.isnan(arr[t-1]) or arr[t - self.mom_lookback - 1] <= 0:
continue
r = arr[t-1] / arr[t - self.mom_lookback - 1] - 1.0
if np.isfinite(r) and r > best_r:
best_r, best_s = r, s
return best_s
for t in range(len(data)):
if t < need:
continue
closes = sig_arr[:t]
if np.isnan(closes[-1]):
continue
desired = self.regime_func(closes, regime)
changed = False
if regime is None:
regime, bars, changed = desired, 0, True
else:
bars += 1
if desired != regime and bars >= self.min_hold:
regime, bars, changed = desired, 0, True
if not changed and (t - need) % 21 != 0:
continue
basket = ron_syms if regime == "risk_on" else roff_syms
pick = pick_best(basket, t)
if pick:
w.iat[t, cols.index(pick)] = 1.0
w = w.replace(0.0, np.nan).ffill().fillna(0.0)
w = w.shift(1).fillna(0.0)
# Vol-target overlay
daily_ret = data[cols].pct_change(fill_method=None).fillna(0.0)
port_rets = (w * daily_ret).sum(axis=1)
realized_vol = port_rets.rolling(self.vol_window, min_periods=21).std() * np.sqrt(252)
scale = (self.target_vol / realized_vol).clip(lower=self.min_lev, upper=self.max_lev)
scale = scale.shift(1).fillna(1.0)
w = w.mul(scale, axis=0)
# Profit-take
if self.pt_threshold <= 0:
return w
held = w.idxmax(axis=1)
max_w = w.max(axis=1)
held[max_w < 1e-8] = ""
park_col = self.pt_park if self.pt_park in w.columns else ""
entry_price = None
current_sym = None
is_stopped = False
restore_level = self.pt_threshold - self.pt_band
for i in range(len(w)):
sym = held.iloc[i]
if not sym or max_w.iloc[i] < 1e-8:
current_sym, entry_price, is_stopped = None, None, False
continue
if sym != current_sym:
current_sym = sym
entry_price = float(data[sym].iloc[i-1]) if i > 0 and sym in data.columns else None
is_stopped = False
continue
if entry_price is None or entry_price <= 0 or sym not in data.columns:
continue
yesterday = float(data[sym].iloc[i-1]) if i > 0 else float(data[sym].iloc[i])
gain = yesterday / entry_price - 1.0
if is_stopped:
if gain < restore_level:
is_stopped = False
else:
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
else:
if gain >= self.pt_threshold:
is_stopped = True
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
return w
# Regime functions
def regime_ma(window: int):
"""Simple MA crossover: above MA → risk_on."""
def fn(closes, current):
if len(closes) < window:
return "risk_off"
return "risk_on" if closes[-1] > np.mean(closes[-window:]) else "risk_off"
return fn
def regime_dual_ma(short: int = 50, long: int = 200):
"""Golden/death cross: MA_short > MA_long → risk_on."""
def fn(closes, current):
if len(closes) < long:
return "risk_off"
ma_s = np.mean(closes[-short:])
ma_l = np.mean(closes[-long:])
return "risk_on" if ma_s > ma_l else "risk_off"
return fn
def regime_roc(window: int = 63):
"""Rate of change: positive N-day return → risk_on."""
def fn(closes, current):
if len(closes) < window + 1 or closes[-window-1] <= 0:
return "risk_off"
roc = closes[-1] / closes[-window-1] - 1.0
return "risk_on" if roc > 0 else "risk_off"
return fn
def regime_ma_plus_vol(ma_window: int = 150, vol_window: int = 20, vol_cap: float = 0.20):
"""MA + vol filter: above MA AND vol < cap → risk_on."""
def fn(closes, current):
if len(closes) < max(ma_window, vol_window + 1):
return "risk_off"
above_ma = closes[-1] > np.mean(closes[-ma_window:])
if not above_ma:
return "risk_off"
rets = np.diff(closes[-vol_window-1:]) / np.maximum(closes[-vol_window-1:-1], 1e-12)
vol = float(np.std(rets, ddof=1) * np.sqrt(252))
return "risk_on" if vol < vol_cap else "risk_off"
return fn
def regime_ma_slope(ma_window: int = 150, slope_window: int = 10):
"""MA + positive slope: above MA AND MA is rising → risk_on."""
def fn(closes, current):
if len(closes) < ma_window + slope_window:
return "risk_off"
ma_now = np.mean(closes[-ma_window:])
ma_prev = np.mean(closes[-ma_window - slope_window:-slope_window])
above = closes[-1] > ma_now
rising = ma_now > ma_prev
return "risk_on" if (above and rising) else "risk_off"
return fn
def regime_composite(ma_w: int = 150, roc_w: int = 63, vol_w: int = 20,
vol_cap: float = 0.22, threshold: int = 2):
"""Composite: score from MA + ROC + vol. Need ≥ threshold signals for risk_on."""
def fn(closes, current):
if len(closes) < max(ma_w, roc_w + 1, vol_w + 1):
return "risk_off"
score = 0
# Signal 1: above MA
if closes[-1] > np.mean(closes[-ma_w:]):
score += 1
# Signal 2: positive ROC
if closes[-roc_w-1] > 0 and closes[-1] / closes[-roc_w-1] - 1.0 > 0:
score += 1
# Signal 3: vol below cap
rets = np.diff(closes[-vol_w-1:]) / np.maximum(closes[-vol_w-1:-1], 1e-12)
vol = float(np.std(rets, ddof=1) * np.sqrt(252))
if vol < vol_cap:
score += 1
return "risk_on" if score >= threshold else "risk_off"
return fn
def regime_adaptive_ma(fast: int = 100, slow: int = 200, vol_w: int = 60,
vol_threshold: float = 0.18):
"""Adaptive MA: use fast MA in low vol, slow MA in high vol.
High vol → slower signal → fewer whipsaws."""
def fn(closes, current):
if len(closes) < slow:
return "risk_off"
rets = np.diff(closes[-vol_w-1:]) / np.maximum(closes[-vol_w-1:-1], 1e-12)
vol = float(np.std(rets, ddof=1) * np.sqrt(252))
ma_w = slow if vol > vol_threshold else fast
return "risk_on" if closes[-1] > np.mean(closes[-ma_w:]) else "risk_off"
return fn
def direction_c(stock_data, etf_data, stock_tickers, all_etfs):
print("\n" + "=" * 100)
print(" DIRECTION C: IMPROVED REGIME ENGINE")
print("=" * 100)
etf_cols = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns]
results = []
# V7+VT36 baseline (uses V3's full regime with MA+vol+dd+peak gates)
label, eq, m = run_and_report(
"V7+VT36 (V3 full regime, baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[etf_cols])
results.append((label, eq, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}")
regime_configs = [
# Simple MA variants
("Simple MA100", regime_ma(100)),
("Simple MA150", regime_ma(150)),
("Simple MA200", regime_ma(200)),
# Dual MA
("Dual MA 50/200", regime_dual_ma(50, 200)),
("Dual MA 50/150", regime_dual_ma(50, 150)),
("Dual MA 20/100", regime_dual_ma(20, 100)),
# ROC
("ROC 63d", regime_roc(63)),
("ROC 126d", regime_roc(126)),
# MA + vol filter
("MA150 + Vol<20%", regime_ma_plus_vol(150, 20, 0.20)),
("MA150 + Vol<25%", regime_ma_plus_vol(150, 20, 0.25)),
("MA200 + Vol<20%", regime_ma_plus_vol(200, 20, 0.20)),
# MA + slope
("MA150 + Rising (10d)", regime_ma_slope(150, 10)),
("MA150 + Rising (20d)", regime_ma_slope(150, 20)),
# Composite
("Composite 2/3 (MA150+ROC63+Vol)", regime_composite(150, 63, 20, 0.22, 2)),
("Composite 3/3 (all must agree)", regime_composite(150, 63, 20, 0.22, 3)),
# Adaptive MA
("Adaptive MA100/200 (vol pivot 18%)", regime_adaptive_ma(100, 200, 60, 0.18)),
("Adaptive MA100/200 (vol pivot 22%)", regime_adaptive_ma(100, 200, 60, 0.22)),
]
for label, regime_fn in regime_configs:
try:
strategy = V7AltRegime(regime_func=regime_fn)
_, eq, m = run_and_report(f"AltRegime: {label}", strategy, etf_data[etf_cols])
results.append((f"AltRegime: {label}", eq, m))
print(f" {label}: Ann={m['annualizedReturn']*100:.1f}%, Sharpe={m['sharpeRatio']:.2f}, MaxDD={m['maxDrawdown']*100:.1f}%")
except Exception as e:
print(f" FAILED {label}: {e}")
results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True)
print(f"\n--- Direction C Results (sorted by Sharpe) ---")
print_table(results)
return results
# =========================================================================
# MAIN
# =========================================================================
def main():
stock_data, etf_data, stock_tickers, all_etfs = load_all_data()
results_a = direction_a(stock_data, etf_data, stock_tickers, all_etfs)
results_b = direction_b(stock_data, etf_data, stock_tickers, all_etfs)
results_c = direction_c(stock_data, etf_data, stock_tickers, all_etfs)
# Final summary
print("\n" + "=" * 100)
print(" CROSS-DIRECTION SUMMARY")
print("=" * 100)
all_results = (
[(f"[A] {l}", eq, m) for l, eq, m in results_a] +
[(f"[B] {l}", eq, m) for l, eq, m in results_b] +
[(f"[C] {l}", eq, m) for l, eq, m in results_c]
)
all_results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True)
print(f"\nTop 10 by Sharpe across all directions:")
print(f"{'#':<4} {'Strategy':<60} {'Ann%':>7} {'Sharpe':>7} {'MaxDD%':>7} {'Calmar':>7}")
print("-" * 100)
for i, (label, _, m) in enumerate(all_results[:10], 1):
print(f"{i:<4} {label:<60} "
f"{m['annualizedReturn']*100:>6.1f}% "
f"{m['sharpeRatio']:>7.2f} "
f"{m['maxDrawdown']*100:>6.1f}% "
f"{m['calmarRatio']:>7.2f}")
print(f"\nTop 10 by Ann. Return across all directions:")
all_results.sort(key=lambda x: x[2]["annualizedReturn"], reverse=True)
print(f"{'#':<4} {'Strategy':<60} {'Ann%':>7} {'Sharpe':>7} {'MaxDD%':>7} {'Calmar':>7}")
print("-" * 100)
for i, (label, _, m) in enumerate(all_results[:10], 1):
print(f"{i:<4} {label:<60} "
f"{m['annualizedReturn']*100:>6.1f}% "
f"{m['sharpeRatio']:>7.2f} "
f"{m['maxDrawdown']*100:>6.1f}% "
f"{m['calmarRatio']:>7.2f}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,282 @@
"""Fixed re-run for Directions B and C based on review feedback.
Direction B fix: recalibrate V3 thresholds per-sector (scale by vol ratio).
Direction C fix: monkey-patch V3._desired_regime inside real V7, preserving
the full state machine (confirm_days, cooloff, stop_loss, dd_stop).
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
from main import backtest
from strategies.trend_rider_v7 import TrendRiderV7
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
def load_etf_data():
all_etfs = sorted(set([
"SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY",
"SOXL", "SMH", "TECL", "XLK", "TNA", "IWM", "FAS", "XLF",
]))
data = data_manager.update("etfs", all_etfs, with_open=False)
if isinstance(data, tuple):
data = data[0]
cutoff = data.index[-1] - pd.DateOffset(years=YEARS)
return data[data.index >= cutoff]
def run(label, strategy, panel):
eq = backtest(strategy, panel, initial_capital=CAPITAL,
transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m = metrics.raw_summary(eq)
print(f" {label:<55} Ann={m['annualizedReturn']*100:>5.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}% "
f"Sortino={m['sortinoRatio']:.2f} Calmar={m['calmarRatio']:.2f}")
return label, eq, m
# =========================================================================
# DIRECTION B FIX: per-sector calibrated thresholds
# =========================================================================
def direction_b_fixed(etf_data):
print("\n" + "=" * 100)
print(" DIRECTION B FIXED: Sector V7 with recalibrated thresholds")
print("=" * 100)
results = []
core = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns]
# Baseline
r = run("V7+VT36 baseline (SPY→TQQQ/UPRO)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[core])
results.append(r)
eq_v7 = r[1]
# Estimate vol ratios for threshold scaling
rets = etf_data.pct_change(fill_method=None).dropna()
spy_vol = rets["SPY"].std() * np.sqrt(252) if "SPY" in rets.columns else 0.18
print(f"\n SPY realized vol: {spy_vol:.1%}")
sector_configs = [
("SMH", ("SOXL",), "Semiconductor"),
("XLK", ("TECL",), "Technology"),
("IWM", ("TNA",), "Russell 2000"),
("XLF", ("FAS",), "Financials"),
]
sector_eqs = {}
for signal, risk_on, name in sector_configs:
if signal not in etf_data.columns or risk_on[0] not in etf_data.columns:
print(f" SKIP {name}: missing data")
continue
sig_vol = rets[signal].std() * np.sqrt(252) if signal in rets.columns else spy_vol
vol_ratio = sig_vol / spy_vol
print(f" {signal} vol: {sig_vol:.1%}, ratio to SPY: {vol_ratio:.2f}")
needed = [signal] + list(risk_on) + ["GLD", "DBC", "SHY"]
panel = etf_data[[t for t in needed if t in etf_data.columns]]
# Uncalibrated (original V3 thresholds)
v7_raw = TrendRiderV7(
signal=signal, risk_on=risk_on, risk_off=("GLD", "DBC"),
target_vol=0.36, min_lev=0.75,
)
r = run(f" {name} UNCALIBRATED", v7_raw, panel)
results.append(r)
# Calibrated: scale vol/dd/peak thresholds by vol ratio
v7_cal = TrendRiderV7(
signal=signal, risk_on=risk_on, risk_off=("GLD", "DBC"),
target_vol=0.36, min_lev=0.75,
# V3 thresholds scaled by sector vol ratio
vol_enter=0.14 * vol_ratio,
vol_exit=0.20 * vol_ratio,
dd_stop=0.05 * vol_ratio,
peak_enter=0.02 * vol_ratio,
peak_exit=0.05 * vol_ratio,
)
r = run(f" {name} CALIBRATED (×{vol_ratio:.1f})", v7_cal, panel)
results.append(r)
sector_eqs[name] = r[1]
# Ensembles with calibrated sectors
if sector_eqs:
print()
for name, sec_eq in sector_eqs.items():
for v7_pct in (0.5, 0.7):
idx = eq_v7.index.intersection(sec_eq.index)
v7_a = eq_v7.reindex(idx).ffill()
sec_a = sec_eq.reindex(idx).ffill()
ens = (v7_a / v7_a.iloc[0]) * v7_pct + (sec_a / sec_a.iloc[0]) * (1 - v7_pct)
ens = ens * CAPITAL
m = metrics.raw_summary(ens)
label = f" {int(v7_pct*100)}% SPY-V7 + {int((1-v7_pct)*100)}% {name[:8]}-V7 (cal)"
print(f" {label:<55} Ann={m['annualizedReturn']*100:>5.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}% "
f"Sortino={m['sortinoRatio']:.2f} Calmar={m['calmarRatio']:.2f}")
results.append((label, ens, m))
return results
# =========================================================================
# DIRECTION C FIX: inject alt regime into REAL V3 state machine
# =========================================================================
def direction_c_fixed(etf_data):
print("\n" + "=" * 100)
print(" DIRECTION C FIXED: Alt regimes inside real V3 state machine")
print("=" * 100)
core = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_data.columns]
results = []
# Baseline
r = run("V7+VT36 (V3 full regime, baseline)", TrendRiderV7(target_vol=0.36, min_lev=0.75), etf_data[core])
results.append(r)
# Alt regimes: monkey-patch V3._desired_regime, preserving full FSM
def make_alt_v7(regime_fn, label):
v7 = TrendRiderV7(target_vol=0.36, min_lev=0.75)
v7.v3._desired_regime = regime_fn
return v7
# --- Simple MA variants ---
for window in (100, 150, 200, 250):
def regime_ma(closes, current, w=window):
if len(closes) < w:
return "risk_off"
return "risk_on" if closes[-1] > np.mean(closes[-w:]) else "risk_off"
r = run(f"Simple MA{window}", make_alt_v7(regime_ma, f"MA{window}"), etf_data[core])
results.append(r)
# --- Dual MA crossover ---
for short, long in ((50, 200), (50, 150), (20, 100)):
def regime_dual(closes, current, s=short, l=long):
if len(closes) < l:
return "risk_off"
return "risk_on" if np.mean(closes[-s:]) > np.mean(closes[-l:]) else "risk_off"
r = run(f"Dual MA {short}/{long}", make_alt_v7(regime_dual, ""), etf_data[core])
results.append(r)
# --- ROC variants ---
for window in (42, 63, 126):
def regime_roc(closes, current, w=window):
if len(closes) < w + 1 or closes[-w-1] <= 0:
return "risk_off"
return "risk_on" if closes[-1] / closes[-w-1] > 1.0 else "risk_off"
r = run(f"ROC {window}d", make_alt_v7(regime_roc, ""), etf_data[core])
results.append(r)
# --- MA + vol filter (simplified V3) ---
for ma_w, vol_cap in ((150, 0.20), (150, 0.25), (200, 0.22)):
def regime_mavol(closes, current, mw=ma_w, vc=vol_cap):
if len(closes) < max(mw, 21):
return "risk_off"
above = closes[-1] > np.mean(closes[-mw:])
if not above:
return "risk_off"
rets = np.diff(closes[-21:]) / np.maximum(closes[-21:-1], 1e-12)
vol = float(np.std(rets, ddof=1) * np.sqrt(252))
return "risk_on" if vol < vc else "risk_off"
r = run(f"MA{ma_w} + Vol<{int(vol_cap*100)}%", make_alt_v7(regime_mavol, ""), etf_data[core])
results.append(r)
# --- Composite (MA + ROC + vol) ---
for thresh in (2, 3):
def regime_comp(closes, current, t=thresh):
if len(closes) < 200:
return "risk_off"
score = 0
if closes[-1] > np.mean(closes[-150:]):
score += 1
if closes[-64] > 0 and closes[-1] / closes[-64] > 1.0:
score += 1
rets = np.diff(closes[-21:]) / np.maximum(closes[-21:-1], 1e-12)
if np.std(rets, ddof=1) * np.sqrt(252) < 0.22:
score += 1
return "risk_on" if score >= t else "risk_off"
r = run(f"Composite {thresh}/3", make_alt_v7(regime_comp, ""), etf_data[core])
results.append(r)
# --- MA + slope (MA must be rising) ---
for slope_w in (10, 20):
def regime_slope(closes, current, sw=slope_w):
if len(closes) < 150 + sw:
return "risk_off"
ma_now = np.mean(closes[-150:])
ma_prev = np.mean(closes[-150-sw:-sw])
return "risk_on" if (closes[-1] > ma_now and ma_now > ma_prev) else "risk_off"
r = run(f"MA150 + Rising({slope_w}d)", make_alt_v7(regime_slope, ""), etf_data[core])
results.append(r)
# --- Adaptive MA (fast in low vol, slow in high vol) ---
for pivot in (0.15, 0.18, 0.22):
def regime_adapt(closes, current, p=pivot):
if len(closes) < 200:
return "risk_off"
rets = np.diff(closes[-61:]) / np.maximum(closes[-61:-1], 1e-12)
vol = np.std(rets, ddof=1) * np.sqrt(252)
w = 200 if vol > p else 100
return "risk_on" if closes[-1] > np.mean(closes[-w:]) else "risk_off"
r = run(f"Adaptive MA (pivot={int(pivot*100)}%)", make_alt_v7(regime_adapt, ""), etf_data[core])
results.append(r)
# Sort and display
results.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True)
print(f"\n--- Direction C FIXED Results (sorted by Sharpe) ---")
for i, (label, _, m) in enumerate(results, 1):
marker = "" if i <= 3 else ""
print(f" {i:<3} {label:<55} Ann={m['annualizedReturn']*100:>5.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}% "
f"Calmar={m['calmarRatio']:.2f}{marker}")
return results
def main():
print("=" * 100)
print(" V7 BREAKTHROUGH EVAL — FIXED RE-RUN (per review feedback)")
print("=" * 100)
etf_data = load_etf_data()
print(f"Period: {etf_data.index[0].date()}{etf_data.index[-1].date()}")
print(f"ETFs: {sorted(etf_data.columns.tolist())}")
results_b = direction_b_fixed(etf_data)
results_c = direction_c_fixed(etf_data)
# Cross-direction top 10
all_r = [(f"[B] {l}", eq, m) for l, eq, m in results_b] + \
[(f"[C] {l}", eq, m) for l, eq, m in results_c]
all_r.sort(key=lambda x: x[2]["sharpeRatio"], reverse=True)
print(f"\n{'=' * 100}")
print(" FINAL: Top 10 by Sharpe")
print(f"{'=' * 100}")
for i, (label, _, m) in enumerate(all_r[:10], 1):
print(f" {i:<3} {label:<60} Ann={m['annualizedReturn']*100:>5.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}% "
f"Calmar={m['calmarRatio']:.2f}")
all_r.sort(key=lambda x: x[2]["annualizedReturn"], reverse=True)
print(f"\n FINAL: Top 10 by Ann. Return")
print(f" {'-' * 95}")
for i, (label, _, m) in enumerate(all_r[:10], 1):
print(f" {i:<3} {label:<60} Ann={m['annualizedReturn']*100:>5.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}% "
f"Calmar={m['calmarRatio']:.2f}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,382 @@
"""V7 parameter sweep: vol-target range + adaptive profit-take variants.
Direction 1: higher vol-target (VT24 → VT48)
Direction 3: adaptive profit-take (vol-scaled, time-decay, combined)
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
from main import backtest
from strategies.base import Strategy
from strategies.permanent import TrendRiderV3
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
ETF_TICKERS = ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"]
# ---------------------------------------------------------------------------
# Adaptive V7: modular profit-take that accepts a callable threshold
# ---------------------------------------------------------------------------
class TrendRiderV7Adaptive(Strategy):
"""V7 with pluggable profit-take logic.
pt_func(gain, realized_vol, days_held) -> (threshold, restore_level)
If pt_func is None, no profit-take is applied.
"""
def __init__(
self,
ma_long: int = 150,
signal: str = "SPY",
risk_on: tuple[str, ...] = ("TQQQ", "UPRO"),
risk_off: tuple[str, ...] = ("GLD", "DBC"),
target_vol: float = 0.28,
vol_window: int = 60,
min_lev: float = 0.6,
max_lev: float = 1.0,
pt_func=None,
pt_park: str = "SHY",
**v3_kwargs,
) -> None:
self.target_vol = target_vol
self.vol_window = vol_window
self.min_lev = min_lev
self.max_lev = max_lev
self.pt_func = pt_func
self.pt_park = pt_park
self.v3 = TrendRiderV3(
signal=signal, risk_on=risk_on, risk_off=risk_off,
ma_long=ma_long, **v3_kwargs,
)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
w = self.v3.generate_signals(data)
# Vol-target overlay
daily_ret = data.pct_change(fill_method=None).fillna(0.0)
port_rets = (w * daily_ret).sum(axis=1)
realized_vol = (
port_rets.rolling(self.vol_window, min_periods=21).std()
* np.sqrt(252)
)
scale = (self.target_vol / realized_vol).clip(
lower=self.min_lev, upper=self.max_lev,
)
scale = scale.shift(1).fillna(1.0)
w = w.mul(scale, axis=0)
if self.pt_func is None:
return w
# Adaptive profit-take
held = w.idxmax(axis=1)
max_w = w.max(axis=1)
held[max_w < 1e-8] = ""
park_col = self.pt_park if self.pt_park in w.columns else ""
entry_price: float | None = None
current_sym: str | None = None
is_stopped = False
days_held = 0
for i in range(len(w)):
sym = held.iloc[i]
if not sym or max_w.iloc[i] < 1e-8:
current_sym = None
entry_price = None
is_stopped = False
days_held = 0
continue
if sym != current_sym:
current_sym = sym
entry_price = (
float(data[sym].iloc[i - 1])
if i > 0 and sym in data.columns else None
)
is_stopped = False
days_held = 0
continue
days_held += 1
if entry_price is None or entry_price <= 0 or sym not in data.columns:
continue
yesterday = float(data[sym].iloc[i - 1]) if i > 0 else float(data[sym].iloc[i])
gain = yesterday / entry_price - 1.0
rv = float(realized_vol.iloc[i]) if not np.isnan(realized_vol.iloc[i]) else 0.25
threshold, restore_level = self.pt_func(gain, rv, days_held)
if is_stopped:
if gain < restore_level:
is_stopped = False
else:
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
else:
if gain >= threshold:
is_stopped = True
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
return w
# ---------------------------------------------------------------------------
# Profit-take function factories
# ---------------------------------------------------------------------------
def fixed_pt(threshold: float, band: float):
"""Classic fixed threshold (V7 default)."""
def fn(gain, rv, days_held):
return threshold, threshold - band
return fn
def vol_adaptive_pt(base_threshold: float = 0.30, base_vol: float = 0.25,
band_ratio: float = 0.33, lo: float = 0.15, hi: float = 0.50):
"""Threshold scales inversely with realized vol.
High vol → lower threshold (harvest earlier, vol drag is worse).
Low vol → higher threshold (let profits run, drag is mild).
"""
def fn(gain, rv, days_held):
rv = max(rv, 0.05)
t = np.clip(base_threshold * (base_vol / rv), lo, hi)
return t, t * (1 - band_ratio)
return fn
def time_decay_pt(start_threshold: float = 0.40, end_threshold: float = 0.18,
decay_days: int = 120, band_ratio: float = 0.33):
"""Threshold decays linearly over holding period.
Rationale: longer holds accumulate more vol drag → take profits earlier.
"""
def fn(gain, rv, days_held):
frac = min(days_held / decay_days, 1.0)
t = start_threshold - frac * (start_threshold - end_threshold)
return t, t * (1 - band_ratio)
return fn
def combined_pt(base_threshold: float = 0.30, base_vol: float = 0.25,
time_decay_rate: float = 0.0005, min_threshold: float = 0.12,
max_threshold: float = 0.50, band_ratio: float = 0.33):
"""Vol-adaptive + time decay combined."""
def fn(gain, rv, days_held):
rv = max(rv, 0.05)
vol_adj = base_threshold * (base_vol / rv)
time_adj = vol_adj - days_held * time_decay_rate
t = np.clip(time_adj, min_threshold, max_threshold)
return t, t * (1 - band_ratio)
return fn
def trailing_stop_pt(initial_threshold: float = 0.30, trail_pct: float = 0.15,
band_ratio: float = 0.33):
"""Once gain exceeds threshold, switch to trailing stop from peak gain.
Lets winners run further but protects from reversal.
"""
# We need state across calls, so use a mutable closure
state = {"peak_gain": 0.0, "trailing_active": False}
def fn(gain, rv, days_held):
if days_held == 1:
state["peak_gain"] = 0.0
state["trailing_active"] = False
if state["trailing_active"]:
state["peak_gain"] = max(state["peak_gain"], gain)
trail_level = state["peak_gain"] * (1 - trail_pct)
if gain < trail_level:
return -1.0, -1.0 # trigger immediately
return float("inf"), float("inf") # don't trigger via threshold
else:
if gain >= initial_threshold:
state["trailing_active"] = True
state["peak_gain"] = gain
return float("inf"), float("inf")
return initial_threshold, initial_threshold * (1 - band_ratio)
return fn
# ---------------------------------------------------------------------------
# Main sweep
# ---------------------------------------------------------------------------
def main():
print("=" * 100)
print(" V7 PARAMETER SWEEP: Vol-Target + Adaptive Profit-Take")
print("=" * 100)
# Load ETF data
print("\n[1] Loading ETF data...")
etf_data = data_manager.update("etfs", ETF_TICKERS, with_open=False)
if isinstance(etf_data, tuple):
etf_data = etf_data[0]
cutoff = etf_data.index[-1] - pd.DateOffset(years=YEARS)
etf_data = etf_data[etf_data.index >= cutoff]
tradable = [t for t in ETF_TICKERS if t in etf_data.columns]
print(f" Period: {etf_data.index[0].date()}{etf_data.index[-1].date()}")
results: list[tuple[str, str, dict]] = []
def run(group: str, label: str, strategy: Strategy):
eq = backtest(strategy, etf_data[tradable], initial_capital=CAPITAL,
transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m = metrics.raw_summary(eq)
results.append((group, label, m))
print(f" {label:<45} Ann={m['annualizedReturn']*100:.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}%")
# =====================================================================
# SWEEP 1: Vol-target range (with fixed PT30)
# =====================================================================
print("\n[2] Sweep 1: Vol-target range (PT30 fixed)")
print("-" * 70)
vt_configs = [
("VT20", 0.20, 0.45),
("VT24", 0.24, 0.50),
("VT28 (default)", 0.28, 0.60),
("VT32", 0.32, 0.70),
("VT36", 0.36, 0.75),
("VT40", 0.40, 0.80),
("VT44", 0.44, 0.85),
("VT48", 0.48, 0.90),
("No VT (raw V3+PT30)", 0.28, 1.0), # min_lev=max_lev=1.0 → no scaling
]
for label, tv, ml in vt_configs:
if label.startswith("No VT"):
s = TrendRiderV7Adaptive(target_vol=1.0, min_lev=1.0, max_lev=1.0,
pt_func=fixed_pt(0.30, 0.10))
else:
s = TrendRiderV7Adaptive(target_vol=tv, min_lev=ml,
pt_func=fixed_pt(0.30, 0.10))
run("VT sweep", label, s)
# =====================================================================
# SWEEP 2: Profit-take variants (using best VT from sweep 1)
# =====================================================================
print("\n[3] Sweep 2: Profit-take variants (VT32)")
print("-" * 70)
best_vt = 0.32
best_ml = 0.70
pt_configs: list[tuple[str, object]] = [
# Fixed thresholds
("No PT (ablation)", None),
("Fixed PT15 band=5", fixed_pt(0.15, 0.05)),
("Fixed PT20 band=8", fixed_pt(0.20, 0.08)),
("Fixed PT25 band=10", fixed_pt(0.25, 0.10)),
("Fixed PT30 band=10 (default)", fixed_pt(0.30, 0.10)),
("Fixed PT35 band=12", fixed_pt(0.35, 0.12)),
("Fixed PT40 band=15", fixed_pt(0.40, 0.15)),
("Fixed PT50 band=15", fixed_pt(0.50, 0.15)),
# Vol-adaptive
("Vol-adaptive (base=30%, lo=15%)", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)),
("Vol-adaptive (base=25%, lo=12%)", vol_adaptive_pt(0.25, 0.25, 0.33, 0.12, 0.45)),
("Vol-adaptive (base=35%, lo=18%)", vol_adaptive_pt(0.35, 0.25, 0.33, 0.18, 0.55)),
# Time-decay
("Time-decay (40%→18%, 120d)", time_decay_pt(0.40, 0.18, 120)),
("Time-decay (35%→15%, 90d)", time_decay_pt(0.35, 0.15, 90)),
("Time-decay (45%→20%, 150d)", time_decay_pt(0.45, 0.20, 150)),
# Combined
("Combined vol+time (base=30%)", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)),
("Combined vol+time (base=25%)", combined_pt(0.25, 0.25, 0.0005, 0.10, 0.45)),
]
for label, pt_fn in pt_configs:
s = TrendRiderV7Adaptive(target_vol=best_vt, min_lev=best_ml,
pt_func=pt_fn)
run("PT sweep", label, s)
# =====================================================================
# SWEEP 3: Best PT × VT grid (narrow search around top combos)
# =====================================================================
print("\n[4] Sweep 3: Best combos (VT × PT grid)")
print("-" * 70)
grid = [
(0.32, 0.70, "Fixed PT30", fixed_pt(0.30, 0.10)),
(0.36, 0.75, "Fixed PT30", fixed_pt(0.30, 0.10)),
(0.40, 0.80, "Fixed PT30", fixed_pt(0.30, 0.10)),
(0.32, 0.70, "Fixed PT25", fixed_pt(0.25, 0.10)),
(0.36, 0.75, "Fixed PT25", fixed_pt(0.25, 0.10)),
(0.40, 0.80, "Fixed PT25", fixed_pt(0.25, 0.10)),
(0.32, 0.70, "Vol-adapt 30%", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)),
(0.36, 0.75, "Vol-adapt 30%", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)),
(0.40, 0.80, "Vol-adapt 30%", vol_adaptive_pt(0.30, 0.25, 0.33, 0.15, 0.50)),
(0.32, 0.70, "Time-decay 40→18", time_decay_pt(0.40, 0.18, 120)),
(0.36, 0.75, "Time-decay 40→18", time_decay_pt(0.40, 0.18, 120)),
(0.40, 0.80, "Time-decay 40→18", time_decay_pt(0.40, 0.18, 120)),
(0.32, 0.70, "Combined 30%", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)),
(0.36, 0.75, "Combined 30%", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)),
(0.40, 0.80, "Combined 30%", combined_pt(0.30, 0.25, 0.0005, 0.12, 0.50)),
]
for tv, ml, pt_label, pt_fn in grid:
label = f"VT{int(tv*100)} + {pt_label}"
s = TrendRiderV7Adaptive(target_vol=tv, min_lev=ml, pt_func=pt_fn)
run("Grid", label, s)
# =====================================================================
# Final ranking
# =====================================================================
results.sort(key=lambda x: x[2]["annualizedReturn"], reverse=True)
print(f"\n{'=' * 115}")
print(" FINAL RANKING (sorted by annualized return)")
print(f"{'=' * 115}")
print(f"{'#':<4} {'Group':<12} {'Config':<45} {'Ann%':>7} {'Vol%':>7} {'Sharpe':>7} "
f"{'Sortino':>8} {'MaxDD%':>7} {'Calmar':>7}")
print("-" * 115)
for i, (group, label, m) in enumerate(results, 1):
ann = m["annualizedReturn"] * 100
vol = m["annualizedVolatility"] * 100
sr = m["sharpeRatio"]
so = m["sortinoRatio"]
dd = m["maxDrawdown"] * 100
ca = m["calmarRatio"]
marker = "" if i <= 3 else ""
print(f"{i:<4} {group:<12} {label:<45} {ann:>6.1f}% {vol:>6.1f}% {sr:>7.2f} "
f"{so:>8.2f} {dd:>6.1f}% {ca:>7.2f}{marker}")
print(f"{'=' * 115}")
# Highlight top by Sharpe
by_sharpe = sorted(results, key=lambda x: x[2]["sharpeRatio"], reverse=True)
print("\nTop 5 by Sharpe:")
for i, (group, label, m) in enumerate(by_sharpe[:5], 1):
print(f" {i}. {label:<45} Sharpe={m['sharpeRatio']:.3f} "
f"Ann={m['annualizedReturn']*100:.1f}% MaxDD={m['maxDrawdown']*100:.1f}%")
by_calmar = sorted(results, key=lambda x: x[2]["calmarRatio"], reverse=True)
print("\nTop 5 by Calmar:")
for i, (group, label, m) in enumerate(by_calmar[:5], 1):
print(f" {i}. {label:<45} Calmar={m['calmarRatio']:.3f} "
f"Ann={m['annualizedReturn']*100:.1f}% MaxDD={m['maxDrawdown']*100:.1f}%")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,461 @@
"""Direction 2: V7 regime + synthetic 2x/3x leveraged individual stocks.
Hypothesis: replacing TQQQ/UPRO with synthetic 2x-leveraged top-momentum
S&P 500 stocks could beat V7 by combining stock-picking alpha with leverage.
Synthetic leverage model:
daily_return_Nx = N * stock_daily_return - (N-1) * daily_borrow_cost
daily_borrow_cost ≈ risk_free_rate / 252 (conservative: 5% annualized)
This captures:
- Leverage amplification
- Financing cost
- Volatility drag (emerges naturally from daily compounding of leveraged returns)
Variants tested:
A. V7 regime + synth 2x top-5 momentum stocks
B. V7 regime + synth 2x top-10 momentum stocks
C. V7 regime + synth 2x top-1 momentum stock (concentrated)
D. V7 regime + synth 3x top-5 (compare to real TQQQ)
E. V7 regime + synth 2x recovery-momentum top-5
F. V7+VT36 baseline (current SOTA)
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
import universe_history as uh
from main import backtest
from strategies.base import Strategy
from strategies.permanent import TrendRiderV3
from universe import UNIVERSES
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
BORROW_RATE = 0.05 # 5% annualized
# ---------------------------------------------------------------------------
# Synthetic leveraged returns
# ---------------------------------------------------------------------------
def synthetic_leveraged_prices(prices: pd.DataFrame, leverage: float,
borrow_rate: float = BORROW_RATE) -> pd.DataFrame:
"""Create synthetic leveraged price series from daily returns.
Models daily-rebalanced leverage: each day's return is
r_lev = leverage * r_stock - (leverage - 1) * r_borrow
where r_borrow = borrow_rate / 252.
This captures vol drag naturally (daily compounding of amplified returns).
"""
daily_ret = prices.pct_change(fill_method=None).fillna(0.0)
daily_borrow = borrow_rate / 252
lev_ret = leverage * daily_ret - (leverage - 1) * daily_borrow
lev_prices = (1 + lev_ret).cumprod() * 100 # normalize to 100 start
lev_prices.iloc[0] = 100
return lev_prices
# ---------------------------------------------------------------------------
# Strategy: V7 regime + synthetic leveraged stock picking
# ---------------------------------------------------------------------------
class V7SynthLeverage(Strategy):
"""V7 architecture with synthetic leveraged individual stocks as risk-on.
Layer 1: V3 regime engine on SPY → risk-on vs risk-off
Layer 2: Vol-target overlay
Layer 3: Profit-take with hysteresis
Risk-on: top-N stocks by momentum, synthetically leveraged, equal weight.
Risk-off: momentum leader of (GLD, DBC).
"""
def __init__(
self,
stock_tickers: list[str],
leverage: float = 2.0,
top_n: int = 5,
signal: str = "SPY",
defensive: tuple[str, ...] = ("GLD", "DBC"),
# Momentum ranking
mom_lookback: int = 63,
rebal_every: int = 21,
# Selection method
selection: str = "momentum", # "momentum" or "recovery_momentum"
recovery_window: int = 63,
long_mom_lookback: int = 252,
long_mom_skip: int = 21,
# V3 regime
ma_long: int = 150,
# Vol-target
target_vol: float = 0.36,
vol_window: int = 60,
min_lev: float = 0.75,
max_lev: float = 1.0,
# Profit-take
pt_threshold: float = 0.30,
pt_band: float = 0.10,
pt_park: str = "SHY",
):
self.stock_tickers = stock_tickers
self.leverage = leverage
self.top_n = top_n
self.signal = signal
self.defensive = defensive
self.mom_lookback = mom_lookback
self.rebal_every = rebal_every
self.selection = selection
self.recovery_window = recovery_window
self.long_mom_lookback = long_mom_lookback
self.long_mom_skip = long_mom_skip
self.target_vol = target_vol
self.vol_window = vol_window
self.min_lev = min_lev
self.max_lev = max_lev
self.pt_threshold = pt_threshold
self.pt_band = pt_band
self.pt_park = pt_park
self._v3 = TrendRiderV3(
signal=signal, risk_on=("TQQQ", "UPRO"),
risk_off=defensive, ma_long=ma_long,
)
def _rank_stocks(self, data: pd.DataFrame) -> pd.DataFrame:
"""Return cross-sectional rank (higher = better)."""
avail = [t for t in self.stock_tickers if t in data.columns]
panel = data[avail]
if self.selection == "recovery_momentum":
recovery = panel / panel.rolling(self.recovery_window).min() - 1
momentum = panel.shift(self.long_mom_skip).pct_change(
self.long_mom_lookback - self.long_mom_skip, fill_method=None,
)
rec_r = recovery.rank(axis=1, pct=True, na_option="keep")
mom_r = momentum.rank(axis=1, pct=True, na_option="keep")
composite = 0.5 * rec_r + 0.5 * mom_r
return composite
else:
mom = panel.pct_change(self.mom_lookback, fill_method=None)
return mom
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
"""Build weights on ORIGINAL (unleveraged) price columns.
The backtest engine will track returns using the original data.
We transform the returns in a wrapper (see run_synth_backtest below).
Actually — we build a SYNTHETIC price panel and run the strategy
on that. So weights here are on synthetic-leverage columns.
"""
# This is called on the synthetic data panel.
# Columns: stock tickers (synthetic leveraged) + ETFs (original)
w = pd.DataFrame(0.0, index=data.index, columns=data.columns)
if self.signal not in data.columns:
return w
sig_arr = data[self.signal].to_numpy()
avail_stocks = [t for t in self.stock_tickers if t in data.columns]
avail_def = [t for t in self.defensive if t in data.columns]
park_col = self.pt_park if self.pt_park in data.columns else ""
# Rank using the ORIGINAL unleveraged data — NOT passed here.
# We'll precompute ranks externally and attach them.
# For now, rank on the synthetic data (momentum on leveraged prices
# preserves ranking since leverage is monotone on return).
mom = data[avail_stocks].pct_change(self.mom_lookback, fill_method=None)
if self.selection == "recovery_momentum":
panel = data[avail_stocks]
recovery = panel / panel.rolling(self.recovery_window).min() - 1
long_mom = panel.shift(self.long_mom_skip).pct_change(
self.long_mom_lookback - self.long_mom_skip, fill_method=None,
)
rec_r = recovery.rank(axis=1, pct=True, na_option="keep")
mom_r = long_mom.rank(axis=1, pct=True, na_option="keep")
score = 0.5 * rec_r + 0.5 * mom_r
else:
score = mom
need = max(150, self.mom_lookback + 1, self._v3.vol_window + 1,
self._v3.dd_window, self._v3.peak_window,
self.long_mom_lookback + 1 if self.selection == "recovery_momentum" else 0,
self.recovery_window + 1 if self.selection == "recovery_momentum" else 0) + 1
regime: str | None = None
bars = 0
# Phase 1: build raw weights (regime + stock selection)
raw_w = pd.DataFrame(np.nan, index=data.index, columns=data.columns)
for i in range(len(data)):
if i < need:
continue
closes = sig_arr[:i]
if np.isnan(closes[-1]):
continue
desired = self._v3._desired_regime(closes, regime)
changed = False
if regime is None:
regime, bars, changed = desired, 0, True
else:
bars += 1
if desired != regime and bars >= 15:
regime, bars, changed = desired, 0, True
if not changed and (i - need) % self.rebal_every != 0:
continue
row = {c: 0.0 for c in data.columns}
dt = data.index[i]
if regime == "risk_on":
s = score.iloc[i][avail_stocks].dropna()
valid = s.index[data.loc[dt, s.index].notna()]
s = s[valid]
if self.selection == "momentum":
s = s[s > 0]
top = s.nlargest(min(self.top_n, len(s)))
if len(top) > 0:
wt = 1.0 / len(top)
for t in top.index:
row[t] = wt
elif avail_def:
row[avail_def[0]] = 1.0
else:
if avail_def:
dm = data[avail_def].pct_change(63, fill_method=None).iloc[i].dropna()
best = dm.idxmax() if len(dm) > 0 else avail_def[0]
row[best] = 1.0
for c, v in row.items():
raw_w.at[dt, c] = v
raw_w = raw_w.ffill().fillna(0.0)
raw_w = raw_w.shift(1).fillna(0.0)
# Phase 2: Vol-target overlay
daily_ret = data.pct_change(fill_method=None).fillna(0.0)
port_rets = (raw_w * daily_ret).sum(axis=1)
realized_vol = (
port_rets.rolling(self.vol_window, min_periods=21).std() * np.sqrt(252)
)
scale = (self.target_vol / realized_vol).clip(lower=self.min_lev, upper=self.max_lev)
scale = scale.shift(1).fillna(1.0)
w = raw_w.mul(scale, axis=0)
# Phase 3: Profit-take
if self.pt_threshold <= 0:
return w
held = w.idxmax(axis=1)
max_w = w.max(axis=1)
held[max_w < 1e-8] = ""
entry_price: float | None = None
current_sym: str | None = None
is_stopped = False
restore_level = self.pt_threshold - self.pt_band
for i in range(len(w)):
sym = held.iloc[i]
if not sym or max_w.iloc[i] < 1e-8:
current_sym = None
entry_price = None
is_stopped = False
continue
if sym != current_sym:
current_sym = sym
entry_price = (
float(data[sym].iloc[i - 1])
if i > 0 and sym in data.columns else None
)
is_stopped = False
continue
if entry_price is None or entry_price <= 0 or sym not in data.columns:
continue
yesterday = float(data[sym].iloc[i - 1]) if i > 0 else float(data[sym].iloc[i])
gain = yesterday / entry_price - 1.0
if is_stopped:
if gain < restore_level:
is_stopped = False
else:
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
else:
if gain >= self.pt_threshold:
is_stopped = True
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
return w
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print("=" * 95)
print(" DIRECTION 2: V7 + SYNTHETIC LEVERAGED INDIVIDUAL STOCKS")
print("=" * 95)
# Load S&P 500 + PIT + ETFs
print("\n[1] Loading data...")
universe = UNIVERSES["us"]
tickers = universe["fetch"]()
pit_intervals = uh.load_sp500_history()
hist_tickers = uh.all_tickers_ever(pit_intervals)
etfs = ["SPY", "GLD", "DBC", "SHY", "TQQQ", "UPRO", "TLT"]
all_tickers = sorted(set(tickers + hist_tickers + etfs))
raw_data = data_manager.update("us", all_tickers, with_open=False)
if isinstance(raw_data, tuple):
raw_data = raw_data[0]
cutoff = raw_data.index[-1] - pd.DateOffset(years=YEARS)
raw_data = raw_data[raw_data.index >= cutoff]
raw_data = uh.mask_prices(raw_data, pit_intervals)
stock_tickers = [t for t in raw_data.columns
if t not in etfs and raw_data[t].notna().any()]
print(f" Stocks: {len(stock_tickers)}, Period: {raw_data.index[0].date()}{raw_data.index[-1].date()}")
# Build synthetic leveraged price panels
print("\n[2] Building synthetic leveraged prices...")
stock_prices = raw_data[stock_tickers]
synth_2x = synthetic_leveraged_prices(stock_prices, 2.0)
synth_3x = synthetic_leveraged_prices(stock_prices, 3.0)
# Combine synthetic stocks with real ETF prices for each variant
etf_prices = raw_data[etfs]
results: list[tuple[str, dict]] = []
def run(label: str, strategy: Strategy, data_panel: pd.DataFrame):
print(f" {label}...", end=" ", flush=True)
try:
eq = backtest(strategy, data_panel, initial_capital=CAPITAL,
transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m = metrics.raw_summary(eq)
results.append((label, m))
print(f"Ann={m['annualizedReturn']*100:.1f}% Sharpe={m['sharpeRatio']:.2f} "
f"MaxDD={m['maxDrawdown']*100:.1f}%")
except Exception as e:
print(f"FAILED: {e}")
# =====================================================================
# Run variants
# =====================================================================
print("\n[3] Running strategies...")
# --- V7+VT36 baseline (real TQQQ/UPRO) ---
from strategies.trend_rider_v7 import TrendRiderV7
etf_only = [t for t in ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"] if t in etf_prices.columns]
run("V7+VT36 baseline (TQQQ/UPRO)",
TrendRiderV7(target_vol=0.36, min_lev=0.75),
etf_prices[etf_only])
# --- Synth 2x: momentum, various top-N ---
for n in (1, 3, 5, 10):
panel_2x = pd.concat([synth_2x, etf_prices], axis=1)
panel_2x = panel_2x.loc[:, ~panel_2x.columns.duplicated()]
run(f"Synth 2x Mom top-{n} (VT36+PT30)",
V7SynthLeverage(stock_tickers=stock_tickers, leverage=2.0,
top_n=n, target_vol=0.36, min_lev=0.75),
panel_2x)
# --- Synth 2x: recovery-momentum ---
for n in (3, 5, 10):
panel_2x = pd.concat([synth_2x, etf_prices], axis=1)
panel_2x = panel_2x.loc[:, ~panel_2x.columns.duplicated()]
run(f"Synth 2x RecMom top-{n} (VT36+PT30)",
V7SynthLeverage(stock_tickers=stock_tickers, leverage=2.0,
top_n=n, selection="recovery_momentum",
target_vol=0.36, min_lev=0.75),
panel_2x)
# --- Synth 3x: direct comparison with real TQQQ ---
for n in (1, 3, 5):
panel_3x = pd.concat([synth_3x, etf_prices], axis=1)
panel_3x = panel_3x.loc[:, ~panel_3x.columns.duplicated()]
run(f"Synth 3x Mom top-{n} (VT36+PT30)",
V7SynthLeverage(stock_tickers=stock_tickers, leverage=3.0,
top_n=n, target_vol=0.36, min_lev=0.75),
panel_3x)
# --- Synth 2x without vol-target (see if raw 2x stocks need less VT) ---
for n in (3, 5):
panel_2x = pd.concat([synth_2x, etf_prices], axis=1)
panel_2x = panel_2x.loc[:, ~panel_2x.columns.duplicated()]
run(f"Synth 2x Mom top-{n} (no VT, PT30)",
V7SynthLeverage(stock_tickers=stock_tickers, leverage=2.0,
top_n=n, target_vol=1.0, min_lev=1.0, max_lev=1.0),
panel_2x)
# --- Synth 2x with higher PT threshold (2x has less vol drag → let profits run) ---
for pt in (0.40, 0.50):
panel_2x = pd.concat([synth_2x, etf_prices], axis=1)
panel_2x = panel_2x.loc[:, ~panel_2x.columns.duplicated()]
run(f"Synth 2x Mom top-5 (VT36+PT{int(pt*100)})",
V7SynthLeverage(stock_tickers=stock_tickers, leverage=2.0,
top_n=5, target_vol=0.36, min_lev=0.75,
pt_threshold=pt, pt_band=pt*0.33),
panel_2x)
# --- Synth 2x: no profit-take (2x might not need it) ---
panel_2x = pd.concat([synth_2x, etf_prices], axis=1)
panel_2x = panel_2x.loc[:, ~panel_2x.columns.duplicated()]
run("Synth 2x Mom top-5 (VT36, no PT)",
V7SynthLeverage(stock_tickers=stock_tickers, leverage=2.0,
top_n=5, target_vol=0.36, min_lev=0.75,
pt_threshold=0),
panel_2x)
# --- SPY benchmark ---
spy = raw_data["SPY"].dropna()
spy_eq = (spy / spy.iloc[0]) * CAPITAL
results.append(("SPY benchmark", metrics.raw_summary(spy_eq)))
# =====================================================================
# Report
# =====================================================================
results.sort(key=lambda x: x[1]["annualizedReturn"], reverse=True)
print(f"\n{'=' * 110}")
print(" RANKING")
print(f"{'=' * 110}")
print(f"{'#':<4} {'Strategy':<45} {'Ann%':>7} {'Vol%':>7} {'Sharpe':>7} "
f"{'Sortino':>8} {'MaxDD%':>7} {'Calmar':>7}")
print("-" * 110)
for i, (label, m) in enumerate(results, 1):
marker = "" if i <= 3 else ""
print(f"{i:<4} {label:<45} "
f"{m['annualizedReturn']*100:>6.1f}% "
f"{m['annualizedVolatility']*100:>6.1f}% "
f"{m['sharpeRatio']:>7.2f} "
f"{m['sortinoRatio']:>8.2f} "
f"{m['maxDrawdown']*100:>6.1f}% "
f"{m['calmarRatio']:>7.2f}{marker}")
print(f"{'=' * 110}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,385 @@
"""Test three structural improvements to V7+VT36 identified by independent review.
Idea 1: PT entry price reset on restore (fix stale anchor)
Idea 2: TMF (3x bonds) in risk-off basket with TLT MA gate
Idea 3: Open-price fast exit overlay for crash protection
All tested against V7+VT36 baseline (61.2% Ann, Sharpe 1.89, MaxDD -29.2%).
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
from main import backtest
from strategies.base import Strategy
from strategies.permanent import TrendRiderV3
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
# =========================================================================
# V7 with all three ideas as toggleable flags
# =========================================================================
class TrendRiderV7X(Strategy):
"""V7 extended with three structural improvements.
Flags:
reset_entry_on_restore: Idea 1 — reset entry_price when PT restores.
tmf_risk_off: Idea 2 — include TMF in risk-off when TLT > MA.
fast_exit: Idea 3 — emergency exit when SPY opens below threshold.
"""
def __init__(
self,
# V3 regime
ma_long: int = 150,
signal: str = "SPY",
risk_on: tuple[str, ...] = ("TQQQ", "UPRO"),
risk_off: tuple[str, ...] = ("GLD", "DBC"),
# Vol-target
target_vol: float = 0.36,
vol_window: int = 60,
min_lev: float = 0.75,
max_lev: float = 1.0,
# Profit-take
pt_threshold: float = 0.30,
pt_band: float = 0.10,
pt_park: str = "SHY",
# === Idea 1: reset entry on restore ===
reset_entry_on_restore: bool = False,
# === Idea 2: TMF risk-off with bond gate ===
tmf_risk_off: bool = False,
tmf_symbol: str = "TMF",
tlt_symbol: str = "TLT",
tlt_ma_window: int = 200,
# === Idea 3: fast exit on open ===
fast_exit: bool = False,
fast_exit_gap_pct: float = -0.03,
fast_exit_low_window: int = 20,
# V3 passthrough
**v3_kwargs,
) -> None:
self.target_vol = target_vol
self.vol_window = vol_window
self.min_lev = min_lev
self.max_lev = max_lev
self.pt_threshold = pt_threshold
self.pt_band = pt_band
self.pt_park = pt_park
self.signal = signal
self.risk_off_base = risk_off
self.reset_entry_on_restore = reset_entry_on_restore
self.tmf_risk_off = tmf_risk_off
self.tmf_symbol = tmf_symbol
self.tlt_symbol = tlt_symbol
self.tlt_ma_window = tlt_ma_window
self.fast_exit = fast_exit
self.fast_exit_gap_pct = fast_exit_gap_pct
self.fast_exit_low_window = fast_exit_low_window
self.v3 = TrendRiderV3(
signal=signal, risk_on=risk_on, risk_off=risk_off,
ma_long=ma_long, **v3_kwargs,
)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
# --- Layer 1: V3 regime weights ---
w = self.v3.generate_signals(data)
# --- Idea 2: dynamically swap risk-off pick to TMF when bond regime is bullish ---
if self.tmf_risk_off and self.tmf_symbol in data.columns and self.tlt_symbol in data.columns:
tlt = data[self.tlt_symbol]
tlt_ma = tlt.rolling(self.tlt_ma_window).mean()
tlt_bull = (tlt > tlt_ma).shift(1).fillna(False)
risk_off_cols = [c for c in self.risk_off_base if c in w.columns]
tmf_col = self.tmf_symbol
if tmf_col not in w.columns:
w[tmf_col] = 0.0
for i in range(len(w)):
roff_weight = sum(w.iloc[i].get(c, 0.0) for c in risk_off_cols)
if roff_weight < 1e-8:
continue
if tlt_bull.iloc[i]:
# Candidate: TMF vs best of original risk-off by momentum
mom_lookback = 63
if i >= mom_lookback + 1:
best_sym = tmf_col
best_r = -np.inf
candidates = risk_off_cols + [tmf_col]
for sym in candidates:
if sym not in data.columns:
continue
p_now = data[sym].iloc[i - 1]
p_past = data[sym].iloc[i - 1 - mom_lookback]
if pd.notna(p_now) and pd.notna(p_past) and p_past > 0:
r = p_now / p_past - 1.0
if r > best_r:
best_r, best_sym = r, sym
# Reassign risk-off weight to the winner
for c in risk_off_cols:
w.iat[i, w.columns.get_loc(c)] = 0.0
if tmf_col in w.columns:
w.iat[i, w.columns.get_loc(tmf_col)] = 0.0
if best_sym in w.columns:
w.iat[i, w.columns.get_loc(best_sym)] = roff_weight
# --- Idea 3: fast exit overlay (check SPY open for gap-downs) ---
if self.fast_exit and self.signal in data.columns:
spy = data[self.signal]
spy_arr = spy.to_numpy()
risk_on_cols = list(self.v3.risk_on)
risk_off_cols_fast = [c for c in self.risk_off_base if c in w.columns]
park = self.pt_park if self.pt_park in w.columns else ""
for i in range(max(self.fast_exit_low_window + 1, 2), len(w)):
# Check if currently in risk-on
ron_weight = sum(float(w.iloc[i].get(c, 0.0))
for c in risk_on_cols if c in w.columns)
if ron_weight < 1e-8:
continue
prev_close = spy_arr[i - 1]
if np.isnan(prev_close) or prev_close <= 0:
continue
# Gap-down check: today's "effective open" approximated by
# checking if yesterday's close is below N-day low
low_window = spy_arr[max(0, i - 1 - self.fast_exit_low_window):i - 1]
if len(low_window) == 0:
continue
low_val = np.nanmin(low_window)
# Trigger 1: close below N-day low
trigger_low = prev_close <= low_val
# Trigger 2: large single-day drop (gap-down proxy using close-to-close)
if i >= 2:
prev2_close = spy_arr[i - 2]
daily_ret = (prev_close / prev2_close - 1.0) if prev2_close > 0 else 0.0
trigger_gap = daily_ret <= self.fast_exit_gap_pct
else:
trigger_gap = False
if trigger_low or trigger_gap:
# Emergency: zero out risk-on, move to park
for c in risk_on_cols:
if c in w.columns:
w.iat[i, w.columns.get_loc(c)] = 0.0
if park and park in w.columns:
w.iat[i, w.columns.get_loc(park)] = 1.0
# --- Layer 2: Vol-target overlay ---
daily_ret = data.pct_change(fill_method=None).fillna(0.0)
# Only use columns present in w
common_cols = w.columns.intersection(daily_ret.columns)
port_rets = (w[common_cols] * daily_ret[common_cols]).sum(axis=1)
realized_vol = (
port_rets.rolling(self.vol_window, min_periods=21).std() * np.sqrt(252)
)
scale = (self.target_vol / realized_vol).clip(
lower=self.min_lev, upper=self.max_lev,
)
scale = scale.shift(1).fillna(1.0)
w = w.mul(scale, axis=0)
# --- Layer 3: Profit-take with hysteresis ---
if self.pt_threshold <= 0:
return w
held = w.idxmax(axis=1)
max_w = w.max(axis=1)
held[max_w < 1e-8] = ""
park_col = self.pt_park if self.pt_park in w.columns else ""
entry_price: float | None = None
current_sym: str | None = None
is_stopped = False
restore_level = self.pt_threshold - self.pt_band
for i in range(len(w)):
sym = held.iloc[i]
if not sym or max_w.iloc[i] < 1e-8:
current_sym = None
entry_price = None
is_stopped = False
continue
if sym != current_sym:
current_sym = sym
entry_price = (
float(data[sym].iloc[i - 1])
if i > 0 and sym in data.columns else None
)
is_stopped = False
continue
if entry_price is None or entry_price <= 0 or sym not in data.columns:
continue
yesterday = float(data[sym].iloc[i - 1]) if i > 0 else float(data[sym].iloc[i])
gain = yesterday / entry_price - 1.0
if is_stopped:
if gain < restore_level:
is_stopped = False
# === Idea 1: reset entry price on restore ===
if self.reset_entry_on_restore:
entry_price = yesterday
else:
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
else:
if gain >= self.pt_threshold:
is_stopped = True
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
return w
# =========================================================================
# Main
# =========================================================================
def main():
print("=" * 100)
print(" THREE IDEAS EVALUATION")
print("=" * 100)
# Load data including TMF and TLT
all_etfs = sorted(set([
"SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY", "TLT", "TMF",
]))
print(f"\nLoading ETFs: {all_etfs}")
data = data_manager.update("etfs", all_etfs, with_open=False)
if isinstance(data, tuple):
data = data[0]
cutoff = data.index[-1] - pd.DateOffset(years=YEARS)
data = data[data.index >= cutoff]
avail = sorted(data.columns.tolist())
print(f"Available: {avail}")
print(f"Period: {data.index[0].date()}{data.index[-1].date()}")
results: list[tuple[str, dict]] = []
def run(label, strategy):
eq = backtest(strategy, data, initial_capital=CAPITAL,
transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m = metrics.raw_summary(eq)
results.append((label, m))
print(f" {label:<60} Ann={m['annualizedReturn']*100:>5.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}% "
f"Sortino={m['sortinoRatio']:.2f} Calmar={m['calmarRatio']:.2f}")
# Baseline
print("\n--- Baseline ---")
run("V7+VT36 (baseline)",
TrendRiderV7X(target_vol=0.36, min_lev=0.75))
# === Idea 1: PT entry reset ===
print("\n--- Idea 1: PT Entry Price Reset ---")
run("V7+VT36 + PT reset",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
reset_entry_on_restore=True))
# Also test with different PT thresholds to see if reset changes the optimum
for pt in (0.20, 0.25, 0.30, 0.35, 0.40):
run(f" PT reset + PT{int(pt*100)}",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
reset_entry_on_restore=True,
pt_threshold=pt, pt_band=pt * 0.33))
# === Idea 2: TMF risk-off ===
print("\n--- Idea 2: TMF in Risk-Off ---")
if "TMF" in data.columns and "TLT" in data.columns:
run("V7+VT36 + TMF risk-off (TLT gate)",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
tmf_risk_off=True))
# TMF + PT reset combo
run("V7+VT36 + TMF + PT reset",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
tmf_risk_off=True, reset_entry_on_restore=True))
# Different TLT MA windows
for tlt_ma in (100, 150, 200, 250):
run(f" TMF risk-off (TLT MA{tlt_ma})",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
tmf_risk_off=True, tlt_ma_window=tlt_ma))
else:
print(" TMF or TLT not available, skipping")
# === Idea 3: Fast exit ===
print("\n--- Idea 3: Fast Exit ---")
for gap in (-0.02, -0.03, -0.04):
for low_w in (10, 20):
run(f"V7+VT36 + fast exit (gap={gap:.0%}, low={low_w}d)",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
fast_exit=True, fast_exit_gap_pct=gap,
fast_exit_low_window=low_w))
# === All three combined ===
print("\n--- All Three Combined ---")
if "TMF" in data.columns:
run("V7+VT36 + ALL (reset+TMF+fast exit)",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
reset_entry_on_restore=True,
tmf_risk_off=True,
fast_exit=True, fast_exit_gap_pct=-0.03,
fast_exit_low_window=20))
# Best combo tuning
for pt in (0.25, 0.30, 0.35):
run(f" ALL + PT{int(pt*100)}",
TrendRiderV7X(target_vol=0.36, min_lev=0.75,
reset_entry_on_restore=True,
tmf_risk_off=True,
fast_exit=True, fast_exit_gap_pct=-0.03,
fast_exit_low_window=20,
pt_threshold=pt, pt_band=pt * 0.33))
# Final ranking
results.sort(key=lambda x: x[1]["sharpeRatio"], reverse=True)
print(f"\n{'=' * 110}")
print(" FINAL RANKING (by Sharpe)")
print(f"{'=' * 110}")
print(f"{'#':<4} {'Strategy':<60} {'Ann%':>6} {'Vol%':>6} {'Sharpe':>7} "
f"{'Sortino':>8} {'MaxDD%':>7} {'Calmar':>7}")
print("-" * 110)
for i, (label, m) in enumerate(results, 1):
marker = "" if i <= 3 else ""
print(f"{i:<4} {label:<60} "
f"{m['annualizedReturn']*100:>5.1f}% "
f"{m['annualizedVolatility']*100:>5.1f}% "
f"{m['sharpeRatio']:>7.2f} "
f"{m['sortinoRatio']:>8.2f} "
f"{m['maxDrawdown']*100:>6.1f}% "
f"{m['calmarRatio']:>7.2f}{marker}")
print(f"{'=' * 110}")
# Also rank by Ann return
results.sort(key=lambda x: x[1]["annualizedReturn"], reverse=True)
print(f"\n Top 5 by Annualized Return:")
for i, (label, m) in enumerate(results[:5], 1):
print(f" {i}. {label:<55} Ann={m['annualizedReturn']*100:.1f}% "
f"Sharpe={m['sharpeRatio']:.2f} MaxDD={m['maxDrawdown']*100:.1f}%")
if __name__ == "__main__":
main()

501
research/v7_trade_audit.py Normal file
View File

@@ -0,0 +1,501 @@
"""V7+VT36 full trade log: regime changes, instrument switches, PT events,
vol-target scale, contribution analysis.
Run from /home/gahow/projects/quant:
uv run python research/v7_trade_audit.py
"""
from __future__ import annotations
import sys
sys.path.insert(0, ".")
import numpy as np
import pandas as pd
import data_manager
import metrics
from main import backtest
from strategies.trend_rider_v7 import TrendRiderV7
YEARS = 10
CAPITAL = 100_000
TX_COST = 0.001
FIXED_FEE = 2.0
def load_data():
tickers = sorted(set(["SPY", "TQQQ", "UPRO", "GLD", "DBC", "SHY"]))
data = data_manager.update("etfs", tickers, with_open=False)
if isinstance(data, tuple):
data = data[0]
cutoff = data.index[-1] - pd.DateOffset(years=YEARS)
data = data[data.index >= cutoff]
cols = [c for c in tickers if c in data.columns]
return data[cols]
def trace_v7(data: pd.DataFrame):
"""Replicate V7+VT36 signal generation step-by-step, logging every event."""
v7 = TrendRiderV7(target_vol=0.36, min_lev=0.75)
# --- Layer 1: V3 regime weights ---
w_v3 = v7.v3.generate_signals(data)
# --- Layer 2: Vol-target overlay ---
daily_ret = data.pct_change(fill_method=None).fillna(0.0)
port_rets_v3 = (w_v3 * daily_ret).sum(axis=1)
realized_vol = (
port_rets_v3.rolling(v7.vol_window, min_periods=21).std() * np.sqrt(252)
)
scale_raw = (v7.target_vol / realized_vol).clip(lower=v7.min_lev, upper=v7.max_lev)
scale = scale_raw.shift(1).fillna(1.0)
w_vt = w_v3.mul(scale, axis=0)
# --- Layer 3: Profit-take (replicate the loop, logging events) ---
w = w_vt.copy()
held = w.idxmax(axis=1)
max_w = w.max(axis=1)
held[max_w < 1e-8] = ""
park_col = v7.pt_park if v7.pt_park in w.columns else ""
entry_price = None
current_sym = None
is_stopped = False
restore_level = v7.pt_threshold - v7.pt_band
pt_events = [] # (date, type, sym, entry_price, exit_price, gain%)
for i in range(len(w)):
sym = held.iloc[i]
if not sym or max_w.iloc[i] < 1e-8:
current_sym = None
entry_price = None
is_stopped = False
continue
if sym != current_sym:
current_sym = sym
entry_price = float(data[sym].iloc[i - 1]) if i > 0 and sym in data.columns else None
is_stopped = False
continue
if entry_price is None or entry_price <= 0 or sym not in data.columns:
continue
yesterday = float(data[sym].iloc[i - 1]) if i > 0 else float(data[sym].iloc[i])
gain = yesterday / entry_price - 1.0
if is_stopped:
if gain < restore_level:
is_stopped = False
pt_events.append((data.index[i], "RESTORE", sym, entry_price, yesterday, gain))
else:
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
else:
if gain >= v7.pt_threshold:
is_stopped = True
pt_events.append((data.index[i], "CLEAR", sym, entry_price, yesterday, gain))
w.iloc[i] = 0.0
if park_col:
w.at[w.index[i], park_col] = scale.iloc[i]
return w_v3, w_vt, w, scale, pt_events
def analyze_regime(w_v3: pd.DataFrame, data: pd.DataFrame):
"""Trace regime changes from V3 weights."""
held_v3 = w_v3.idxmax(axis=1)
max_w = w_v3.max(axis=1)
held_v3[max_w < 1e-8] = ""
risk_on_syms = {"TQQQ", "UPRO"}
risk_off_syms = {"GLD", "DBC"}
events = []
prev_regime = None
prev_sym = None
regime_start = None
for i in range(len(held_v3)):
sym = held_v3.iloc[i]
if not sym:
continue
regime = "risk_on" if sym in risk_on_syms else ("risk_off" if sym in risk_off_syms else "other")
if regime != prev_regime:
if prev_regime is not None:
events.append({
"date": data.index[i],
"type": "REGIME_CHANGE",
"from": prev_regime,
"to": regime,
"from_sym": prev_sym,
"to_sym": sym,
})
prev_regime = regime
regime_start = data.index[i]
prev_sym = sym
elif sym != prev_sym:
events.append({
"date": data.index[i],
"type": "INSTRUMENT_SWITCH",
"regime": regime,
"from_sym": prev_sym,
"to_sym": sym,
})
prev_sym = sym
return events, held_v3
def compute_contributions(data, w_v3, w_vt, w_final, scale, pt_events):
"""Return attribution of returns to risk-on, risk-off, PT-park, and vol-target."""
daily_ret = data.pct_change(fill_method=None).fillna(0.0)
# Identify which days are in PT-park (cleared to cash by PT layer)
# NOTE: SHY is not in V3's output columns, so PT clears to 0% (cash), not SHY.
risk_on_syms = {"TQQQ", "UPRO"}
risk_off_syms = {"GLD", "DBC"}
vt_sum = w_vt.abs().sum(axis=1)
final_sum = w_final.abs().sum(axis=1)
held_final = w_final.idxmax(axis=1)
max_w_final = w_final.max(axis=1)
held_final[max_w_final < 1e-8] = ""
held_vt = w_vt.idxmax(axis=1)
max_w_vt = w_vt.max(axis=1)
held_vt[max_w_vt < 1e-8] = ""
# Classify each day
day_class = pd.Series("none", index=data.index)
for i in range(len(data)):
sym_vt = held_vt.iloc[i]
sym_final = held_final.iloc[i]
# PT-park: VT layer has allocation but final layer cleared to 0
if vt_sum.iloc[i] > 0.01 and final_sum.iloc[i] < 0.01:
day_class.iloc[i] = "pt_park"
elif not sym_final:
continue
elif sym_final in risk_on_syms:
day_class.iloc[i] = "risk_on"
elif sym_final in risk_off_syms:
day_class.iloc[i] = "risk_off"
elif sym_final == "SHY":
day_class.iloc[i] = "risk_off_shy"
else:
day_class.iloc[i] = "other"
# Daily portfolio return for each layer
port_ret_v3 = (w_v3 * daily_ret).sum(axis=1)
port_ret_vt = (w_vt * daily_ret).sum(axis=1)
port_ret_final = (w_final * daily_ret).sum(axis=1)
# Contribution by regime state
contrib = {}
for cls in ["risk_on", "risk_off", "risk_off_shy", "pt_park", "none"]:
mask = day_class == cls
contrib[cls] = {
"days": int(mask.sum()),
"cum_return": float((1 + port_ret_final[mask]).prod() - 1) if mask.any() else 0.0,
"avg_daily_ret": float(port_ret_final[mask].mean()) if mask.any() else 0.0,
}
# Vol-target impact: compare V3-only vs V3+VT
eq_v3 = (1 + port_ret_v3).cumprod() * CAPITAL
eq_vt = (1 + port_ret_vt).cumprod() * CAPITAL
eq_final = (1 + port_ret_final).cumprod() * CAPITAL
return day_class, contrib, eq_v3, eq_vt, eq_final
def main():
data = load_data()
print(f"Data: {data.index[0].date()} to {data.index[-1].date()}, {len(data)} days")
print(f"Columns: {list(data.columns)}")
w_v3, w_vt, w_final, scale, pt_events = trace_v7(data)
# =========================================================================
# 1. Regime changes and instrument switches
# =========================================================================
regime_events, held_v3 = analyze_regime(w_v3, data)
print(f"\n{'='*90}")
print(" REGIME CHANGES AND INSTRUMENT SWITCHES")
print(f"{'='*90}")
regime_changes = [e for e in regime_events if e["type"] == "REGIME_CHANGE"]
instrument_switches = [e for e in regime_events if e["type"] == "INSTRUMENT_SWITCH"]
print(f"\nTotal regime changes: {len(regime_changes)}")
print(f"Total instrument switches (within regime): {len(instrument_switches)}")
print(f"\n--- Regime Changes ---")
for e in regime_changes:
print(f" {e['date'].date()} {e['from']:>9} -> {e['to']:<9} ({e['from_sym']} -> {e['to_sym']})")
print(f"\n--- Instrument Switches ---")
for e in instrument_switches:
print(f" {e['date'].date()} [{e['regime']:>9}] {e['from_sym']} -> {e['to_sym']}")
# Regime holding periods
risk_on_syms = {"TQQQ", "UPRO"}
risk_off_syms = {"GLD", "DBC"}
regime_series = pd.Series("none", index=data.index)
for i in range(len(held_v3)):
sym = held_v3.iloc[i]
if sym in risk_on_syms:
regime_series.iloc[i] = "risk_on"
elif sym in risk_off_syms:
regime_series.iloc[i] = "risk_off"
active = regime_series[regime_series != "none"]
if len(active) > 0:
pct_risk_on = (active == "risk_on").mean()
pct_risk_off = (active == "risk_off").mean()
print(f"\n Time in risk_on: {pct_risk_on:.1%}")
print(f" Time in risk_off: {pct_risk_off:.1%}")
# Average holding period per regime stint
regime_shifts = active != active.shift(1)
stint_id = regime_shifts.cumsum()
stint_lengths = stint_id.groupby(stint_id).count()
avg_stint = stint_lengths.mean()
print(f" Avg regime stint: {avg_stint:.0f} trading days")
# =========================================================================
# 2. Profit-take events
# =========================================================================
print(f"\n{'='*90}")
print(" PROFIT-TAKE EVENTS")
print(f"{'='*90}")
print(f"\nTotal PT events: {len(pt_events)}")
clears = [e for e in pt_events if e[1] == "CLEAR"]
restores = [e for e in pt_events if e[1] == "RESTORE"]
print(f" CLEAR events: {len(clears)}")
print(f" RESTORE events: {len(restores)}")
if clears:
print(f"\n--- CLEAR Events ---")
gains = []
for date, typ, sym, entry, exit_p, gain in clears:
print(f" {date.date()} {sym:>5} entry=${entry:.2f} exit=${exit_p:.2f} gain={gain:+.1%}")
gains.append(gain)
print(f"\n Avg gain at CLEAR: {np.mean(gains):.1%}")
print(f" Min gain at CLEAR: {np.min(gains):.1%}")
print(f" Max gain at CLEAR: {np.max(gains):.1%}")
if restores:
print(f"\n--- RESTORE Events ---")
for date, typ, sym, entry, exit_p, gain in restores:
print(f" {date.date()} {sym:>5} entry=${entry:.2f} price=${exit_p:.2f} gain={gain:+.1%}")
# PT-park days
day_class, contrib, eq_v3, eq_vt, eq_final = compute_contributions(
data, w_v3, w_vt, w_final, scale, pt_events)
pt_park_days = (day_class == "pt_park").sum()
total_active_days = (day_class != "none").sum()
print(f"\n Days in PT-park (SHY): {pt_park_days} ({pt_park_days/total_active_days:.1%} of active days)")
# =========================================================================
# 3. Vol-target scale analysis
# =========================================================================
print(f"\n{'='*90}")
print(" VOL-TARGET SCALE ANALYSIS")
print(f"{'='*90}")
active_scale = scale[scale < 1.0 - 1e-6]
print(f"\n Scale stats (when < 1.0):")
print(f" Days at full scale (1.0): {(scale >= 1.0 - 1e-6).sum()}")
print(f" Days below 1.0: {len(active_scale)}")
print(f" Days below 0.90: {(scale < 0.90).sum()}")
print(f" Days below 0.80: {(scale < 0.80).sum()}")
print(f" Days at floor (0.75): {(scale <= 0.75 + 1e-6).sum()}")
print(f" Min scale: {scale.min():.3f}")
print(f" Mean scale: {scale.mean():.3f}")
# When did scale hit the floor?
at_floor = scale[scale <= 0.75 + 1e-6]
if len(at_floor) > 0:
print(f"\n Periods at floor (scale=0.75):")
floor_mask = scale <= 0.75 + 1e-6
shifts = floor_mask != floor_mask.shift(1)
stint_ids = shifts.cumsum()
floor_stints = stint_ids[floor_mask]
for stint_id_val in floor_stints.unique():
stint_dates = floor_stints[floor_stints == stint_id_val].index
if len(stint_dates) > 0:
print(f" {stint_dates[0].date()} to {stint_dates[-1].date()} ({len(stint_dates)} days)")
# =========================================================================
# 4. Contribution analysis
# =========================================================================
print(f"\n{'='*90}")
print(" RETURN CONTRIBUTION BY STATE")
print(f"{'='*90}")
for cls in ["risk_on", "risk_off", "risk_off_shy", "pt_park"]:
c = contrib[cls]
print(f"\n {cls:>14}: {c['days']:>5} days "
f"cum_return={c['cum_return']:>+8.1%} "
f"avg_daily={c['avg_daily_ret']*10000:>+6.1f}bps")
# =========================================================================
# 5. With vs Without vol-target
# =========================================================================
print(f"\n{'='*90}")
print(" VOL-TARGET IMPACT (V3 alone vs V3+VT)")
print(f"{'='*90}")
daily_ret = data.pct_change(fill_method=None).fillna(0.0)
# V3-only with turnover cost
eq_v3_bt = backtest(
type("V3Only", (), {"generate_signals": lambda self, d: TrendRiderV7(target_vol=0.36, min_lev=0.75).v3.generate_signals(d)})(),
data, initial_capital=CAPITAL, transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m_v3 = metrics.raw_summary(eq_v3_bt)
# V7+VT36 full
eq_full = backtest(
TrendRiderV7(target_vol=0.36, min_lev=0.75),
data, initial_capital=CAPITAL, transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m_full = metrics.raw_summary(eq_full)
# V7 with VT but no PT
eq_no_pt = backtest(
TrendRiderV7(target_vol=0.36, min_lev=0.75, pt_threshold=0.0),
data, initial_capital=CAPITAL, transaction_cost=TX_COST, fixed_fee=FIXED_FEE)
m_no_pt = metrics.raw_summary(eq_no_pt)
print(f"\n {'Variant':<30} {'Ann':>7} {'Sharpe':>7} {'MaxDD':>7} {'Sortino':>8} {'Calmar':>7}")
print(f" {'-'*67}")
for label, m in [("V3 only (no VT, no PT)", m_v3),
("V3 + VT (no PT)", m_no_pt),
("V7+VT36 full (VT+PT)", m_full)]:
print(f" {label:<30} {m['annualizedReturn']*100:>6.1f}% {m['sharpeRatio']:>7.2f} "
f"{m['maxDrawdown']*100:>6.1f}% {m['sortinoRatio']:>8.2f} {m['calmarRatio']:>7.2f}")
# =========================================================================
# 6. Drawdown analysis
# =========================================================================
print(f"\n{'='*90}")
print(" WORST DRAWDOWN PERIODS")
print(f"{'='*90}")
rolling_peak = eq_full.cummax()
dd = (eq_full - rolling_peak) / rolling_peak
# Find top 5 drawdown troughs
dd_sorted = dd.sort_values()
seen_windows = []
top_dds = []
for date, dd_val in dd_sorted.items():
too_close = False
for prev_date in seen_windows:
if abs((date - prev_date).days) < 60:
too_close = True
break
if not too_close:
top_dds.append((date, dd_val))
seen_windows.append(date)
if len(top_dds) >= 5:
break
for trough_date, trough_dd in top_dds:
# Find start of this drawdown (last peak before trough)
pre_trough = rolling_peak.loc[:trough_date]
peak_val = pre_trough.iloc[-1]
peak_dates = eq_full[eq_full >= peak_val * 0.999].loc[:trough_date]
if len(peak_dates) > 0:
peak_date = peak_dates.index[0]
else:
peak_date = trough_date
# What was the strategy holding?
held_during = day_class.loc[peak_date:trough_date]
regime_during = held_during.value_counts()
# Recovery date
post_trough = eq_full.loc[trough_date:]
recovered = post_trough[post_trough >= peak_val]
recovery_date = recovered.index[0] if len(recovered) > 0 else None
print(f"\n DD {trough_dd:.1%} | {peak_date.date()} -> {trough_date.date()}"
f" | Recovery: {recovery_date.date() if recovery_date else 'N/A'}")
print(f" State during DD: {dict(regime_during)}")
# What instrument was held at the trough?
held_final = w_final.idxmax(axis=1)
max_w_final = w_final.max(axis=1)
held_final[max_w_final < 1e-8] = ""
trough_sym = held_final.loc[trough_date] if trough_date in held_final.index else "?"
print(f" Held at trough: {trough_sym}")
# =========================================================================
# 7. Disproportionate trades
# =========================================================================
print(f"\n{'='*90}")
print(" TOP 10 BEST AND WORST SINGLE-DAY RETURNS")
print(f"{'='*90}")
port_ret_final = (w_final * daily_ret).sum(axis=1)
# Apply turnover cost
turnover = w_final.diff().abs().sum(axis=1).fillna(0.0)
port_ret_final_net = port_ret_final - turnover * TX_COST
held_final = w_final.idxmax(axis=1)
max_w_final = w_final.max(axis=1)
held_final[max_w_final < 1e-8] = ""
best = port_ret_final_net.nlargest(10)
worst = port_ret_final_net.nsmallest(10)
print("\n BEST DAYS:")
for date, ret in best.items():
sym = held_final.loc[date] if date in held_final.index else "?"
sc = scale.loc[date] if date in scale.index else 1.0
print(f" {date.date()} {ret:>+7.2%} holding={sym} scale={sc:.2f}")
print("\n WORST DAYS:")
for date, ret in worst.items():
sym = held_final.loc[date] if date in held_final.index else "?"
sc = scale.loc[date] if date in scale.index else 1.0
print(f" {date.date()} {ret:>+7.2%} holding={sym} scale={sc:.2f}")
# Compound impact of top/bottom 20 days
sorted_rets = port_ret_final_net.sort_values()
total_days = len(sorted_rets)
eq_without_top20 = (1 + sorted_rets.iloc[:-20]).prod()
eq_without_bottom20 = (1 + sorted_rets.iloc[20:]).prod()
eq_all = (1 + sorted_rets).prod()
print(f"\n Total compound growth factor: {eq_all:.2f}x")
print(f" Without best 20 days: {eq_without_top20:.2f}x")
print(f" Without worst 20 days: {eq_without_bottom20:.2f}x")
# =========================================================================
# 8. Annual returns breakdown
# =========================================================================
print(f"\n{'='*90}")
print(" ANNUAL RETURNS BREAKDOWN")
print(f"{'='*90}")
yearly = port_ret_final_net.groupby(port_ret_final_net.index.year)
print(f"\n {'Year':>6} {'Return':>8} {'Vol':>7} {'MaxDD':>7} {'Days_ON':>8} {'Days_OFF':>9} {'Days_PT':>8}")
print(f" {'-'*55}")
for year, rets in yearly:
ann_r = (1 + rets).prod() ** (252 / len(rets)) - 1 if len(rets) > 0 else 0
vol = rets.std() * np.sqrt(252) if len(rets) > 1 else 0
eq_yr = (1 + rets).cumprod()
mdd = ((eq_yr / eq_yr.cummax()) - 1).min()
yr_class = day_class.loc[rets.index]
n_on = (yr_class == "risk_on").sum()
n_off = ((yr_class == "risk_off") | (yr_class == "risk_off_shy")).sum()
n_pt = (yr_class == "pt_park").sum()
print(f" {year:>6} {ann_r:>+7.1%} {vol:>6.1%} {mdd:>+6.1%} {n_on:>8} {n_off:>9} {n_pt:>8}")
if __name__ == "__main__":
main()