"""P0 robustness validation for TrendRiderV3. P0.1 Walk-forward / OOS split — IS = 2015-2020, OOS = 2021-2026-05. Optimize parameters on IS by CAGR, evaluate the IS-best config on OOS, then compare to the default config evaluated on the same windows. P0.2 Block bootstrap on daily returns (block_len=21, n_boot=5000) to compute CIs for CAGR / Sharpe / MaxDD / Calmar / FinalMultiple. P0.3 De-leveraged comparison — replace risk_on=(TQQQ, UPRO) with (SPY, QQQ) to isolate timing edge from leverage edge. Compare to SPY/QQQ B&H. Run: uv run python -m research.trend_rider_p0 """ from __future__ import annotations import argparse import os import sys from dataclasses import asdict from itertools import product import numpy as np import pandas as pd sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from research.trend_rider_robustness import ( Evaluation, buy_hold_weights, evaluate_strategy, evaluate_weights, load_price_panel, portfolio_returns, ) from strategies.permanent import TrendRiderV3 IS_START = "2015-01-02" IS_END = "2020-12-31" OOS_START = "2021-01-01" OOS_END = "2026-05-07" def _fmt_pct(x: float) -> str: return f"{x * 100:7.2f}%" def _print_eval(label: str, ev: Evaluation) -> None: print( f" {label:<24s} " f"CAGR {_fmt_pct(ev.cagr)} " f"Sharpe {ev.sharpe:5.2f} " f"MDD {_fmt_pct(ev.max_drawdown)} " f"Calmar {ev.calmar:5.2f} " f"FinalX {ev.final_multiple:6.2f} " f"Switches {ev.switches:4d}" ) # --------------------------------------------------------------------------- # P0.1 — Walk-forward / OOS # --------------------------------------------------------------------------- def is_oos_grid() -> list[dict]: """Slightly larger sweep than default to expose IS-optimal corners.""" return [ { "vol_enter": ve, "vol_exit": vx, "dd_stop": dd, "peak_enter": pe, "mom_lookback": mom, "regime_min_hold": mh, "stop_loss_pct": sl, } for ve, vx, dd, pe, mom, mh, sl in product( [0.12, 0.14, 0.16], [0.20], [0.04, 0.05, 0.07], [0.01, 0.02, 0.03], [42, 63, 84], [10, 15, 20], [0.10, 0.15, 0.20], ) ] def walk_forward(prices: pd.DataFrame, transaction_cost: float = 0.001) -> dict: """Optimize on IS, evaluate IS-best on OOS, compare to defaults.""" grid = is_oos_grid() is_rows = [] for kwargs in grid: strat = TrendRiderV3(**kwargs) weights = strat.generate_signals(prices) ev = evaluate_weights( "is", weights, prices[weights.columns], transaction_cost=transaction_cost, start=IS_START, end=IS_END, ) row = asdict(ev) row.update(kwargs) is_rows.append(row) is_df = pd.DataFrame(is_rows).sort_values("cagr", ascending=False).reset_index(drop=True) is_top = is_df.iloc[0] is_best_kwargs = {k: is_top[k] for k in grid[0].keys()} # Cast numeric grid values to native types is_best_kwargs = { k: (int(v) if isinstance(v, (int, np.integer)) else float(v)) for k, v in is_best_kwargs.items() } # mom_lookback / regime_min_hold are ints for k in ("mom_lookback", "regime_min_hold"): is_best_kwargs[k] = int(is_best_kwargs[k]) # OOS evaluation of IS-best strat_isbest = TrendRiderV3(**is_best_kwargs) w_isbest = strat_isbest.generate_signals(prices) isbest_oos = evaluate_weights( "is_best_OOS", w_isbest, prices[w_isbest.columns], transaction_cost=transaction_cost, start=OOS_START, end=OOS_END, ) # Defaults on IS and OOS default = TrendRiderV3() w_def = default.generate_signals(prices) def_is = evaluate_weights( "default_IS", w_def, prices[w_def.columns], transaction_cost=transaction_cost, start=IS_START, end=IS_END, ) def_oos = evaluate_weights( "default_OOS", w_def, prices[w_def.columns], transaction_cost=transaction_cost, start=OOS_START, end=OOS_END, ) # SPY B&H benchmark on each window spy_w = buy_hold_weights(prices, "SPY") qqq_w = buy_hold_weights(prices, "QQQ") spy_is = evaluate_weights("spy_IS", spy_w, prices[spy_w.columns], 0.0, IS_START, IS_END) spy_oos = evaluate_weights("spy_OOS", spy_w, prices[spy_w.columns], 0.0, OOS_START, OOS_END) qqq_is = evaluate_weights("qqq_IS", qqq_w, prices[qqq_w.columns], 0.0, IS_START, IS_END) qqq_oos = evaluate_weights("qqq_OOS", qqq_w, prices[qqq_w.columns], 0.0, OOS_START, OOS_END) # Decay metric: how much CAGR fell from IS-fitted to OOS return { "is_grid": is_df, "is_best_kwargs": is_best_kwargs, "is_best_IS_cagr": float(is_top["cagr"]), "is_best_OOS": isbest_oos, "default_IS": def_is, "default_OOS": def_oos, "spy_IS": spy_is, "spy_OOS": spy_oos, "qqq_IS": qqq_is, "qqq_OOS": qqq_oos, } # --------------------------------------------------------------------------- # P0.2 — Block bootstrap on daily returns # --------------------------------------------------------------------------- def block_bootstrap( returns: pd.Series, n_boot: int = 5000, block_len: int = 21, seed: int = 42, ) -> pd.DataFrame: """Stationary block bootstrap on daily returns. Resamples with replacement in fixed-length blocks to preserve short-horizon autocorrelation / volatility clustering. Returns a DataFrame with columns [cagr, sharpe, max_drawdown, calmar, final_multiple] of length n_boot. """ r = returns.values n = len(r) rng = np.random.default_rng(seed) n_blocks = int(np.ceil(n / block_len)) # Pre-allocate cagrs = np.empty(n_boot) sharpes = np.empty(n_boot) mdds = np.empty(n_boot) finals = np.empty(n_boot) span_years = n / 252.0 for b in range(n_boot): starts = rng.integers(0, n - block_len + 1, size=n_blocks) idx = (starts[:, None] + np.arange(block_len)[None, :]).ravel()[:n] sample = r[idx] equity = np.cumprod(1.0 + sample) finals[b] = equity[-1] cagrs[b] = equity[-1] ** (1.0 / span_years) - 1.0 std = sample.std(ddof=1) sharpes[b] = (sample.mean() / std * np.sqrt(252)) if std > 0 else 0.0 running_max = np.maximum.accumulate(equity) mdds[b] = float(np.min(equity / running_max - 1.0)) df = pd.DataFrame({ "cagr": cagrs, "sharpe": sharpes, "max_drawdown": mdds, "final_multiple": finals, }) df["calmar"] = df["cagr"] / df["max_drawdown"].abs().replace(0.0, np.nan) return df def bootstrap_summary(boot: pd.DataFrame) -> pd.DataFrame: qs = [0.025, 0.05, 0.25, 0.50, 0.75, 0.95, 0.975] summary = boot.quantile(qs).T summary.columns = [f"p{int(q * 1000):04d}" for q in qs] summary["mean"] = boot.mean() summary["std"] = boot.std(ddof=1) summary["prob_neg_cagr"] = np.nan summary["prob_below_spy"] = np.nan return summary # --------------------------------------------------------------------------- # P0.3 — De-leveraged comparison # --------------------------------------------------------------------------- def deleveraged_evaluations( prices: pd.DataFrame, transaction_cost: float = 0.001 ) -> dict[str, Evaluation]: out: dict[str, Evaluation] = {} # Standard (leveraged) levered = TrendRiderV3() w_lev = levered.generate_signals(prices) out["TR_v3_leveraged"] = evaluate_weights( "TR_v3_leveraged", w_lev, prices[w_lev.columns], transaction_cost=transaction_cost, start=IS_START, end=OOS_END, ) # No leverage on equity (risk_on = SPY/QQQ), commodity risk_off nolev = TrendRiderV3(risk_on=("SPY", "QQQ")) w_nl = nolev.generate_signals(prices) out["TR_v3_nolev_SPYQQQ"] = evaluate_weights( "TR_v3_nolev_SPYQQQ", w_nl, prices[w_nl.columns], transaction_cost=transaction_cost, start=IS_START, end=OOS_END, ) # No leverage AND cash-only risk_off (most conservative — pure timing edge on equity) nolev_shy = TrendRiderV3(risk_on=("SPY", "QQQ"), risk_off=("SHY",)) w_nl_shy = nolev_shy.generate_signals(prices) out["TR_v3_nolev_SHYoff"] = evaluate_weights( "TR_v3_nolev_SHYoff", w_nl_shy, prices[w_nl_shy.columns], transaction_cost=transaction_cost, start=IS_START, end=OOS_END, ) # Buy-and-hold benchmarks spy_w = buy_hold_weights(prices, "SPY") qqq_w = buy_hold_weights(prices, "QQQ") out["SPY_BH"] = evaluate_weights("SPY_BH", spy_w, prices[spy_w.columns], 0.0, IS_START, OOS_END) out["QQQ_BH"] = evaluate_weights("QQQ_BH", qqq_w, prices[qqq_w.columns], 0.0, IS_START, OOS_END) # 50/50 SPY+QQQ rebalanced (passive, no timing) — fairer "equity passive" benchmark cols = [c for c in ["SPY", "QQQ"] if c in prices.columns] if len(cols) == 2: eq_w = pd.DataFrame(0.5, index=prices.index, columns=cols) out["SPY_QQQ_5050"] = evaluate_weights( "SPY_QQQ_5050", eq_w, prices[cols], 0.0, IS_START, OOS_END ) return out # --------------------------------------------------------------------------- # main # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="P0 validation suite for TrendRiderV3") parser.add_argument("--n-boot", type=int, default=5000) parser.add_argument("--block-len", type=int, default=21) parser.add_argument("--transaction-cost", type=float, default=0.001) parser.add_argument("--out-dir", default="data") args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) prices = load_price_panel() print(f"Panel: {prices.index.min().date()} to {prices.index.max().date()}, " f"{prices.shape[1]} columns") # ---------- P0.1 ---------- print("\n" + "=" * 78) print("P0.1 Walk-forward / Out-of-sample") print(f" IS = {IS_START} → {IS_END}") print(f" OOS = {OOS_START} → {OOS_END}") print("=" * 78) wf = walk_forward(prices, transaction_cost=args.transaction_cost) is_grid = wf["is_grid"] is_grid.to_csv(os.path.join(args.out_dir, "p0_walkforward_isgrid.csv"), index=False) print(f"\nGrid size: {len(is_grid)} | top 3 by IS CAGR:") cols_show = ["cagr", "sharpe", "max_drawdown", "vol_enter", "dd_stop", "peak_enter", "mom_lookback", "regime_min_hold", "stop_loss_pct"] print(is_grid[cols_show].head(3).to_string(index=False)) print(f"\nIS-best params: {wf['is_best_kwargs']}") print(f" IS CAGR : {_fmt_pct(wf['is_best_IS_cagr'])}") print(f" OOS perf of IS-best params:") _print_eval("IS-best (OOS)", wf["is_best_OOS"]) _print_eval("Default (IS)", wf["default_IS"]) _print_eval("Default (OOS)", wf["default_OOS"]) _print_eval("SPY B&H (IS)", wf["spy_IS"]) _print_eval("SPY B&H (OOS)", wf["spy_OOS"]) _print_eval("QQQ B&H (IS)", wf["qqq_IS"]) _print_eval("QQQ B&H (OOS)", wf["qqq_OOS"]) decay = wf["is_best_IS_cagr"] - wf["is_best_OOS"].cagr print(f"\n Performance decay (IS→OOS) of IS-best : {_fmt_pct(decay)}") decay_def = wf["default_IS"].cagr - wf["default_OOS"].cagr print(f" Performance decay (IS→OOS) of default : {_fmt_pct(decay_def)}") # ---------- P0.2 ---------- print("\n" + "=" * 78) print("P0.2 Block bootstrap (block_len=" f"{args.block_len}, n_boot={args.n_boot})") print("=" * 78) default = TrendRiderV3() weights = default.generate_signals(prices) rets = portfolio_returns(weights, prices[weights.columns], transaction_cost=args.transaction_cost) rets = rets[(rets.index >= IS_START) & (rets.index <= OOS_END)] print(f" Returns series : {len(rets)} days, " f"mean {rets.mean()*252:.4f}, vol {rets.std(ddof=1)*np.sqrt(252):.4f}") boot_full = block_bootstrap( rets, n_boot=args.n_boot, block_len=args.block_len, seed=42 ) boot_full.to_csv(os.path.join(args.out_dir, "p0_bootstrap_full.csv"), index=False) print("\nFull-sample bootstrap (2015-2026):") print(bootstrap_summary(boot_full).round(4).to_string()) # Probability statements spy_oos_cagr = wf["spy_OOS"].cagr p_below_spy = float((boot_full["cagr"] < spy_oos_cagr).mean()) p_neg = float((boot_full["cagr"] < 0).mean()) p_dd_50 = float((boot_full["max_drawdown"] < -0.50).mean()) p_sharpe_below_05 = float((boot_full["sharpe"] < 0.5).mean()) print( f"\n P(CAGR<0) = {p_neg:.3f}\n" f" P(CAGR= OOS_START] boot_oos = block_bootstrap( rets_oos, n_boot=args.n_boot, block_len=args.block_len, seed=43 ) print("\nOOS-only bootstrap (2021-2026):") print(bootstrap_summary(boot_oos).round(4).to_string()) # ---------- P0.3 ---------- print("\n" + "=" * 78) print("P0.3 De-leveraged comparison") print("=" * 78) de = deleveraged_evaluations(prices, transaction_cost=args.transaction_cost) rows = [] for name, ev in de.items(): rows.append(asdict(ev)) _print_eval(name, ev) pd.DataFrame(rows).to_csv(os.path.join(args.out_dir, "p0_deleveraged.csv"), index=False) # Also break by IS / OOS print("\n Same comparison, split IS vs OOS:") for label, (start, end) in {"IS": (IS_START, IS_END), "OOS": (OOS_START, OOS_END)}.items(): print(f" --- {label} ({start} → {end}) ---") subs = {} # Recompute on the slice for nm, ctor in { "TR_v3_leveraged": TrendRiderV3(), "TR_v3_nolev_SPYQQQ": TrendRiderV3(risk_on=("SPY", "QQQ")), "TR_v3_nolev_SHYoff": TrendRiderV3(risk_on=("SPY", "QQQ"), risk_off=("SHY",)), }.items(): w = ctor.generate_signals(prices) subs[nm] = evaluate_weights( nm, w, prices[w.columns], args.transaction_cost, start, end ) spy_w = buy_hold_weights(prices, "SPY") qqq_w = buy_hold_weights(prices, "QQQ") subs["SPY_BH"] = evaluate_weights("SPY_BH", spy_w, prices[spy_w.columns], 0.0, start, end) subs["QQQ_BH"] = evaluate_weights("QQQ_BH", qqq_w, prices[qqq_w.columns], 0.0, start, end) for nm, ev in subs.items(): _print_eval(nm, ev) if __name__ == "__main__": main()