Add 28 research scripts covering DCA simulation, momentum evaluation, Sharpe optimization, trend rider analysis, and US fundamentals exploration.
420 lines
15 KiB
Python
420 lines
15 KiB
Python
"""P0 robustness validation for TrendRiderV3.
|
|
|
|
P0.1 Walk-forward / OOS split — IS = 2015-2020, OOS = 2021-2026-05.
|
|
Optimize parameters on IS by CAGR, evaluate the IS-best config on OOS,
|
|
then compare to the default config evaluated on the same windows.
|
|
P0.2 Block bootstrap on daily returns (block_len=21, n_boot=5000) to compute
|
|
CIs for CAGR / Sharpe / MaxDD / Calmar / FinalMultiple.
|
|
P0.3 De-leveraged comparison — replace risk_on=(TQQQ, UPRO) with (SPY, QQQ)
|
|
to isolate timing edge from leverage edge. Compare to SPY/QQQ B&H.
|
|
|
|
Run:
|
|
uv run python -m research.trend_rider_p0
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from dataclasses import asdict
|
|
from itertools import product
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from research.trend_rider_robustness import (
|
|
Evaluation,
|
|
buy_hold_weights,
|
|
evaluate_strategy,
|
|
evaluate_weights,
|
|
load_price_panel,
|
|
portfolio_returns,
|
|
)
|
|
from strategies.permanent import TrendRiderV3
|
|
|
|
|
|
IS_START = "2015-01-02"
|
|
IS_END = "2020-12-31"
|
|
OOS_START = "2021-01-01"
|
|
OOS_END = "2026-05-07"
|
|
|
|
|
|
def _fmt_pct(x: float) -> str:
|
|
return f"{x * 100:7.2f}%"
|
|
|
|
|
|
def _print_eval(label: str, ev: Evaluation) -> None:
|
|
print(
|
|
f" {label:<24s} "
|
|
f"CAGR {_fmt_pct(ev.cagr)} "
|
|
f"Sharpe {ev.sharpe:5.2f} "
|
|
f"MDD {_fmt_pct(ev.max_drawdown)} "
|
|
f"Calmar {ev.calmar:5.2f} "
|
|
f"FinalX {ev.final_multiple:6.2f} "
|
|
f"Switches {ev.switches:4d}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# P0.1 — Walk-forward / OOS
|
|
# ---------------------------------------------------------------------------
|
|
def is_oos_grid() -> list[dict]:
|
|
"""Slightly larger sweep than default to expose IS-optimal corners."""
|
|
return [
|
|
{
|
|
"vol_enter": ve,
|
|
"vol_exit": vx,
|
|
"dd_stop": dd,
|
|
"peak_enter": pe,
|
|
"mom_lookback": mom,
|
|
"regime_min_hold": mh,
|
|
"stop_loss_pct": sl,
|
|
}
|
|
for ve, vx, dd, pe, mom, mh, sl in product(
|
|
[0.12, 0.14, 0.16],
|
|
[0.20],
|
|
[0.04, 0.05, 0.07],
|
|
[0.01, 0.02, 0.03],
|
|
[42, 63, 84],
|
|
[10, 15, 20],
|
|
[0.10, 0.15, 0.20],
|
|
)
|
|
]
|
|
|
|
|
|
def walk_forward(prices: pd.DataFrame, transaction_cost: float = 0.001) -> dict:
|
|
"""Optimize on IS, evaluate IS-best on OOS, compare to defaults."""
|
|
grid = is_oos_grid()
|
|
is_rows = []
|
|
for kwargs in grid:
|
|
strat = TrendRiderV3(**kwargs)
|
|
weights = strat.generate_signals(prices)
|
|
ev = evaluate_weights(
|
|
"is",
|
|
weights,
|
|
prices[weights.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=IS_START,
|
|
end=IS_END,
|
|
)
|
|
row = asdict(ev)
|
|
row.update(kwargs)
|
|
is_rows.append(row)
|
|
is_df = pd.DataFrame(is_rows).sort_values("cagr", ascending=False).reset_index(drop=True)
|
|
is_top = is_df.iloc[0]
|
|
|
|
is_best_kwargs = {k: is_top[k] for k in grid[0].keys()}
|
|
# Cast numeric grid values to native types
|
|
is_best_kwargs = {
|
|
k: (int(v) if isinstance(v, (int, np.integer)) else float(v))
|
|
for k, v in is_best_kwargs.items()
|
|
}
|
|
# mom_lookback / regime_min_hold are ints
|
|
for k in ("mom_lookback", "regime_min_hold"):
|
|
is_best_kwargs[k] = int(is_best_kwargs[k])
|
|
|
|
# OOS evaluation of IS-best
|
|
strat_isbest = TrendRiderV3(**is_best_kwargs)
|
|
w_isbest = strat_isbest.generate_signals(prices)
|
|
isbest_oos = evaluate_weights(
|
|
"is_best_OOS",
|
|
w_isbest,
|
|
prices[w_isbest.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=OOS_START,
|
|
end=OOS_END,
|
|
)
|
|
|
|
# Defaults on IS and OOS
|
|
default = TrendRiderV3()
|
|
w_def = default.generate_signals(prices)
|
|
def_is = evaluate_weights(
|
|
"default_IS",
|
|
w_def,
|
|
prices[w_def.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=IS_START,
|
|
end=IS_END,
|
|
)
|
|
def_oos = evaluate_weights(
|
|
"default_OOS",
|
|
w_def,
|
|
prices[w_def.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=OOS_START,
|
|
end=OOS_END,
|
|
)
|
|
|
|
# SPY B&H benchmark on each window
|
|
spy_w = buy_hold_weights(prices, "SPY")
|
|
qqq_w = buy_hold_weights(prices, "QQQ")
|
|
spy_is = evaluate_weights("spy_IS", spy_w, prices[spy_w.columns], 0.0, IS_START, IS_END)
|
|
spy_oos = evaluate_weights("spy_OOS", spy_w, prices[spy_w.columns], 0.0, OOS_START, OOS_END)
|
|
qqq_is = evaluate_weights("qqq_IS", qqq_w, prices[qqq_w.columns], 0.0, IS_START, IS_END)
|
|
qqq_oos = evaluate_weights("qqq_OOS", qqq_w, prices[qqq_w.columns], 0.0, OOS_START, OOS_END)
|
|
|
|
# Decay metric: how much CAGR fell from IS-fitted to OOS
|
|
return {
|
|
"is_grid": is_df,
|
|
"is_best_kwargs": is_best_kwargs,
|
|
"is_best_IS_cagr": float(is_top["cagr"]),
|
|
"is_best_OOS": isbest_oos,
|
|
"default_IS": def_is,
|
|
"default_OOS": def_oos,
|
|
"spy_IS": spy_is,
|
|
"spy_OOS": spy_oos,
|
|
"qqq_IS": qqq_is,
|
|
"qqq_OOS": qqq_oos,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# P0.2 — Block bootstrap on daily returns
|
|
# ---------------------------------------------------------------------------
|
|
def block_bootstrap(
|
|
returns: pd.Series,
|
|
n_boot: int = 5000,
|
|
block_len: int = 21,
|
|
seed: int = 42,
|
|
) -> pd.DataFrame:
|
|
"""Stationary block bootstrap on daily returns.
|
|
|
|
Resamples with replacement in fixed-length blocks to preserve short-horizon
|
|
autocorrelation / volatility clustering. Returns a DataFrame with columns
|
|
[cagr, sharpe, max_drawdown, calmar, final_multiple] of length n_boot.
|
|
"""
|
|
r = returns.values
|
|
n = len(r)
|
|
rng = np.random.default_rng(seed)
|
|
n_blocks = int(np.ceil(n / block_len))
|
|
|
|
# Pre-allocate
|
|
cagrs = np.empty(n_boot)
|
|
sharpes = np.empty(n_boot)
|
|
mdds = np.empty(n_boot)
|
|
finals = np.empty(n_boot)
|
|
|
|
span_years = n / 252.0
|
|
|
|
for b in range(n_boot):
|
|
starts = rng.integers(0, n - block_len + 1, size=n_blocks)
|
|
idx = (starts[:, None] + np.arange(block_len)[None, :]).ravel()[:n]
|
|
sample = r[idx]
|
|
equity = np.cumprod(1.0 + sample)
|
|
finals[b] = equity[-1]
|
|
cagrs[b] = equity[-1] ** (1.0 / span_years) - 1.0
|
|
std = sample.std(ddof=1)
|
|
sharpes[b] = (sample.mean() / std * np.sqrt(252)) if std > 0 else 0.0
|
|
running_max = np.maximum.accumulate(equity)
|
|
mdds[b] = float(np.min(equity / running_max - 1.0))
|
|
|
|
df = pd.DataFrame({
|
|
"cagr": cagrs,
|
|
"sharpe": sharpes,
|
|
"max_drawdown": mdds,
|
|
"final_multiple": finals,
|
|
})
|
|
df["calmar"] = df["cagr"] / df["max_drawdown"].abs().replace(0.0, np.nan)
|
|
return df
|
|
|
|
|
|
def bootstrap_summary(boot: pd.DataFrame) -> pd.DataFrame:
|
|
qs = [0.025, 0.05, 0.25, 0.50, 0.75, 0.95, 0.975]
|
|
summary = boot.quantile(qs).T
|
|
summary.columns = [f"p{int(q * 1000):04d}" for q in qs]
|
|
summary["mean"] = boot.mean()
|
|
summary["std"] = boot.std(ddof=1)
|
|
summary["prob_neg_cagr"] = np.nan
|
|
summary["prob_below_spy"] = np.nan
|
|
return summary
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# P0.3 — De-leveraged comparison
|
|
# ---------------------------------------------------------------------------
|
|
def deleveraged_evaluations(
|
|
prices: pd.DataFrame, transaction_cost: float = 0.001
|
|
) -> dict[str, Evaluation]:
|
|
out: dict[str, Evaluation] = {}
|
|
|
|
# Standard (leveraged)
|
|
levered = TrendRiderV3()
|
|
w_lev = levered.generate_signals(prices)
|
|
out["TR_v3_leveraged"] = evaluate_weights(
|
|
"TR_v3_leveraged",
|
|
w_lev,
|
|
prices[w_lev.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=IS_START,
|
|
end=OOS_END,
|
|
)
|
|
|
|
# No leverage on equity (risk_on = SPY/QQQ), commodity risk_off
|
|
nolev = TrendRiderV3(risk_on=("SPY", "QQQ"))
|
|
w_nl = nolev.generate_signals(prices)
|
|
out["TR_v3_nolev_SPYQQQ"] = evaluate_weights(
|
|
"TR_v3_nolev_SPYQQQ",
|
|
w_nl,
|
|
prices[w_nl.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=IS_START,
|
|
end=OOS_END,
|
|
)
|
|
|
|
# No leverage AND cash-only risk_off (most conservative — pure timing edge on equity)
|
|
nolev_shy = TrendRiderV3(risk_on=("SPY", "QQQ"), risk_off=("SHY",))
|
|
w_nl_shy = nolev_shy.generate_signals(prices)
|
|
out["TR_v3_nolev_SHYoff"] = evaluate_weights(
|
|
"TR_v3_nolev_SHYoff",
|
|
w_nl_shy,
|
|
prices[w_nl_shy.columns],
|
|
transaction_cost=transaction_cost,
|
|
start=IS_START,
|
|
end=OOS_END,
|
|
)
|
|
|
|
# Buy-and-hold benchmarks
|
|
spy_w = buy_hold_weights(prices, "SPY")
|
|
qqq_w = buy_hold_weights(prices, "QQQ")
|
|
out["SPY_BH"] = evaluate_weights("SPY_BH", spy_w, prices[spy_w.columns], 0.0, IS_START, OOS_END)
|
|
out["QQQ_BH"] = evaluate_weights("QQQ_BH", qqq_w, prices[qqq_w.columns], 0.0, IS_START, OOS_END)
|
|
|
|
# 50/50 SPY+QQQ rebalanced (passive, no timing) — fairer "equity passive" benchmark
|
|
cols = [c for c in ["SPY", "QQQ"] if c in prices.columns]
|
|
if len(cols) == 2:
|
|
eq_w = pd.DataFrame(0.5, index=prices.index, columns=cols)
|
|
out["SPY_QQQ_5050"] = evaluate_weights(
|
|
"SPY_QQQ_5050", eq_w, prices[cols], 0.0, IS_START, OOS_END
|
|
)
|
|
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# main
|
|
# ---------------------------------------------------------------------------
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="P0 validation suite for TrendRiderV3")
|
|
parser.add_argument("--n-boot", type=int, default=5000)
|
|
parser.add_argument("--block-len", type=int, default=21)
|
|
parser.add_argument("--transaction-cost", type=float, default=0.001)
|
|
parser.add_argument("--out-dir", default="data")
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.out_dir, exist_ok=True)
|
|
prices = load_price_panel()
|
|
print(f"Panel: {prices.index.min().date()} to {prices.index.max().date()}, "
|
|
f"{prices.shape[1]} columns")
|
|
|
|
# ---------- P0.1 ----------
|
|
print("\n" + "=" * 78)
|
|
print("P0.1 Walk-forward / Out-of-sample")
|
|
print(f" IS = {IS_START} → {IS_END}")
|
|
print(f" OOS = {OOS_START} → {OOS_END}")
|
|
print("=" * 78)
|
|
|
|
wf = walk_forward(prices, transaction_cost=args.transaction_cost)
|
|
is_grid = wf["is_grid"]
|
|
is_grid.to_csv(os.path.join(args.out_dir, "p0_walkforward_isgrid.csv"), index=False)
|
|
print(f"\nGrid size: {len(is_grid)} | top 3 by IS CAGR:")
|
|
cols_show = ["cagr", "sharpe", "max_drawdown", "vol_enter", "dd_stop", "peak_enter",
|
|
"mom_lookback", "regime_min_hold", "stop_loss_pct"]
|
|
print(is_grid[cols_show].head(3).to_string(index=False))
|
|
|
|
print(f"\nIS-best params: {wf['is_best_kwargs']}")
|
|
print(f" IS CAGR : {_fmt_pct(wf['is_best_IS_cagr'])}")
|
|
print(f" OOS perf of IS-best params:")
|
|
_print_eval("IS-best (OOS)", wf["is_best_OOS"])
|
|
_print_eval("Default (IS)", wf["default_IS"])
|
|
_print_eval("Default (OOS)", wf["default_OOS"])
|
|
_print_eval("SPY B&H (IS)", wf["spy_IS"])
|
|
_print_eval("SPY B&H (OOS)", wf["spy_OOS"])
|
|
_print_eval("QQQ B&H (IS)", wf["qqq_IS"])
|
|
_print_eval("QQQ B&H (OOS)", wf["qqq_OOS"])
|
|
|
|
decay = wf["is_best_IS_cagr"] - wf["is_best_OOS"].cagr
|
|
print(f"\n Performance decay (IS→OOS) of IS-best : {_fmt_pct(decay)}")
|
|
decay_def = wf["default_IS"].cagr - wf["default_OOS"].cagr
|
|
print(f" Performance decay (IS→OOS) of default : {_fmt_pct(decay_def)}")
|
|
|
|
# ---------- P0.2 ----------
|
|
print("\n" + "=" * 78)
|
|
print("P0.2 Block bootstrap (block_len="
|
|
f"{args.block_len}, n_boot={args.n_boot})")
|
|
print("=" * 78)
|
|
|
|
default = TrendRiderV3()
|
|
weights = default.generate_signals(prices)
|
|
rets = portfolio_returns(weights, prices[weights.columns],
|
|
transaction_cost=args.transaction_cost)
|
|
rets = rets[(rets.index >= IS_START) & (rets.index <= OOS_END)]
|
|
print(f" Returns series : {len(rets)} days, "
|
|
f"mean {rets.mean()*252:.4f}, vol {rets.std(ddof=1)*np.sqrt(252):.4f}")
|
|
|
|
boot_full = block_bootstrap(
|
|
rets, n_boot=args.n_boot, block_len=args.block_len, seed=42
|
|
)
|
|
boot_full.to_csv(os.path.join(args.out_dir, "p0_bootstrap_full.csv"), index=False)
|
|
print("\nFull-sample bootstrap (2015-2026):")
|
|
print(bootstrap_summary(boot_full).round(4).to_string())
|
|
|
|
# Probability statements
|
|
spy_oos_cagr = wf["spy_OOS"].cagr
|
|
p_below_spy = float((boot_full["cagr"] < spy_oos_cagr).mean())
|
|
p_neg = float((boot_full["cagr"] < 0).mean())
|
|
p_dd_50 = float((boot_full["max_drawdown"] < -0.50).mean())
|
|
p_sharpe_below_05 = float((boot_full["sharpe"] < 0.5).mean())
|
|
print(
|
|
f"\n P(CAGR<0) = {p_neg:.3f}\n"
|
|
f" P(CAGR<SPY OOS={spy_oos_cagr:.3f}) = {p_below_spy:.3f}\n"
|
|
f" P(MaxDD<-50%) = {p_dd_50:.3f}\n"
|
|
f" P(Sharpe<0.5) = {p_sharpe_below_05:.3f}"
|
|
)
|
|
|
|
# OOS-only bootstrap (the more honest "future" estimate)
|
|
rets_oos = rets[rets.index >= OOS_START]
|
|
boot_oos = block_bootstrap(
|
|
rets_oos, n_boot=args.n_boot, block_len=args.block_len, seed=43
|
|
)
|
|
print("\nOOS-only bootstrap (2021-2026):")
|
|
print(bootstrap_summary(boot_oos).round(4).to_string())
|
|
|
|
# ---------- P0.3 ----------
|
|
print("\n" + "=" * 78)
|
|
print("P0.3 De-leveraged comparison")
|
|
print("=" * 78)
|
|
de = deleveraged_evaluations(prices, transaction_cost=args.transaction_cost)
|
|
rows = []
|
|
for name, ev in de.items():
|
|
rows.append(asdict(ev))
|
|
_print_eval(name, ev)
|
|
pd.DataFrame(rows).to_csv(os.path.join(args.out_dir, "p0_deleveraged.csv"), index=False)
|
|
|
|
# Also break by IS / OOS
|
|
print("\n Same comparison, split IS vs OOS:")
|
|
for label, (start, end) in {"IS": (IS_START, IS_END), "OOS": (OOS_START, OOS_END)}.items():
|
|
print(f" --- {label} ({start} → {end}) ---")
|
|
subs = {}
|
|
# Recompute on the slice
|
|
for nm, ctor in {
|
|
"TR_v3_leveraged": TrendRiderV3(),
|
|
"TR_v3_nolev_SPYQQQ": TrendRiderV3(risk_on=("SPY", "QQQ")),
|
|
"TR_v3_nolev_SHYoff": TrendRiderV3(risk_on=("SPY", "QQQ"), risk_off=("SHY",)),
|
|
}.items():
|
|
w = ctor.generate_signals(prices)
|
|
subs[nm] = evaluate_weights(
|
|
nm, w, prices[w.columns], args.transaction_cost, start, end
|
|
)
|
|
spy_w = buy_hold_weights(prices, "SPY")
|
|
qqq_w = buy_hold_weights(prices, "QQQ")
|
|
subs["SPY_BH"] = evaluate_weights("SPY_BH", spy_w, prices[spy_w.columns], 0.0, start, end)
|
|
subs["QQQ_BH"] = evaluate_weights("QQQ_BH", qqq_w, prices[qqq_w.columns], 0.0, start, end)
|
|
for nm, ev in subs.items():
|
|
_print_eval(nm, ev)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|