Files
quant/research/trend_rider_p0.py
Gahow Wang 541f7bcf5b research: add strategy evaluation and exploration scripts
Add 28 research scripts covering DCA simulation, momentum evaluation,
Sharpe optimization, trend rider analysis, and US fundamentals exploration.
2026-05-14 12:54:08 +08:00

420 lines
15 KiB
Python

"""P0 robustness validation for TrendRiderV3.
P0.1 Walk-forward / OOS split — IS = 2015-2020, OOS = 2021-2026-05.
Optimize parameters on IS by CAGR, evaluate the IS-best config on OOS,
then compare to the default config evaluated on the same windows.
P0.2 Block bootstrap on daily returns (block_len=21, n_boot=5000) to compute
CIs for CAGR / Sharpe / MaxDD / Calmar / FinalMultiple.
P0.3 De-leveraged comparison — replace risk_on=(TQQQ, UPRO) with (SPY, QQQ)
to isolate timing edge from leverage edge. Compare to SPY/QQQ B&H.
Run:
uv run python -m research.trend_rider_p0
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import asdict
from itertools import product
import numpy as np
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from research.trend_rider_robustness import (
Evaluation,
buy_hold_weights,
evaluate_strategy,
evaluate_weights,
load_price_panel,
portfolio_returns,
)
from strategies.permanent import TrendRiderV3
IS_START = "2015-01-02"
IS_END = "2020-12-31"
OOS_START = "2021-01-01"
OOS_END = "2026-05-07"
def _fmt_pct(x: float) -> str:
return f"{x * 100:7.2f}%"
def _print_eval(label: str, ev: Evaluation) -> None:
print(
f" {label:<24s} "
f"CAGR {_fmt_pct(ev.cagr)} "
f"Sharpe {ev.sharpe:5.2f} "
f"MDD {_fmt_pct(ev.max_drawdown)} "
f"Calmar {ev.calmar:5.2f} "
f"FinalX {ev.final_multiple:6.2f} "
f"Switches {ev.switches:4d}"
)
# ---------------------------------------------------------------------------
# P0.1 — Walk-forward / OOS
# ---------------------------------------------------------------------------
def is_oos_grid() -> list[dict]:
"""Slightly larger sweep than default to expose IS-optimal corners."""
return [
{
"vol_enter": ve,
"vol_exit": vx,
"dd_stop": dd,
"peak_enter": pe,
"mom_lookback": mom,
"regime_min_hold": mh,
"stop_loss_pct": sl,
}
for ve, vx, dd, pe, mom, mh, sl in product(
[0.12, 0.14, 0.16],
[0.20],
[0.04, 0.05, 0.07],
[0.01, 0.02, 0.03],
[42, 63, 84],
[10, 15, 20],
[0.10, 0.15, 0.20],
)
]
def walk_forward(prices: pd.DataFrame, transaction_cost: float = 0.001) -> dict:
"""Optimize on IS, evaluate IS-best on OOS, compare to defaults."""
grid = is_oos_grid()
is_rows = []
for kwargs in grid:
strat = TrendRiderV3(**kwargs)
weights = strat.generate_signals(prices)
ev = evaluate_weights(
"is",
weights,
prices[weights.columns],
transaction_cost=transaction_cost,
start=IS_START,
end=IS_END,
)
row = asdict(ev)
row.update(kwargs)
is_rows.append(row)
is_df = pd.DataFrame(is_rows).sort_values("cagr", ascending=False).reset_index(drop=True)
is_top = is_df.iloc[0]
is_best_kwargs = {k: is_top[k] for k in grid[0].keys()}
# Cast numeric grid values to native types
is_best_kwargs = {
k: (int(v) if isinstance(v, (int, np.integer)) else float(v))
for k, v in is_best_kwargs.items()
}
# mom_lookback / regime_min_hold are ints
for k in ("mom_lookback", "regime_min_hold"):
is_best_kwargs[k] = int(is_best_kwargs[k])
# OOS evaluation of IS-best
strat_isbest = TrendRiderV3(**is_best_kwargs)
w_isbest = strat_isbest.generate_signals(prices)
isbest_oos = evaluate_weights(
"is_best_OOS",
w_isbest,
prices[w_isbest.columns],
transaction_cost=transaction_cost,
start=OOS_START,
end=OOS_END,
)
# Defaults on IS and OOS
default = TrendRiderV3()
w_def = default.generate_signals(prices)
def_is = evaluate_weights(
"default_IS",
w_def,
prices[w_def.columns],
transaction_cost=transaction_cost,
start=IS_START,
end=IS_END,
)
def_oos = evaluate_weights(
"default_OOS",
w_def,
prices[w_def.columns],
transaction_cost=transaction_cost,
start=OOS_START,
end=OOS_END,
)
# SPY B&H benchmark on each window
spy_w = buy_hold_weights(prices, "SPY")
qqq_w = buy_hold_weights(prices, "QQQ")
spy_is = evaluate_weights("spy_IS", spy_w, prices[spy_w.columns], 0.0, IS_START, IS_END)
spy_oos = evaluate_weights("spy_OOS", spy_w, prices[spy_w.columns], 0.0, OOS_START, OOS_END)
qqq_is = evaluate_weights("qqq_IS", qqq_w, prices[qqq_w.columns], 0.0, IS_START, IS_END)
qqq_oos = evaluate_weights("qqq_OOS", qqq_w, prices[qqq_w.columns], 0.0, OOS_START, OOS_END)
# Decay metric: how much CAGR fell from IS-fitted to OOS
return {
"is_grid": is_df,
"is_best_kwargs": is_best_kwargs,
"is_best_IS_cagr": float(is_top["cagr"]),
"is_best_OOS": isbest_oos,
"default_IS": def_is,
"default_OOS": def_oos,
"spy_IS": spy_is,
"spy_OOS": spy_oos,
"qqq_IS": qqq_is,
"qqq_OOS": qqq_oos,
}
# ---------------------------------------------------------------------------
# P0.2 — Block bootstrap on daily returns
# ---------------------------------------------------------------------------
def block_bootstrap(
returns: pd.Series,
n_boot: int = 5000,
block_len: int = 21,
seed: int = 42,
) -> pd.DataFrame:
"""Stationary block bootstrap on daily returns.
Resamples with replacement in fixed-length blocks to preserve short-horizon
autocorrelation / volatility clustering. Returns a DataFrame with columns
[cagr, sharpe, max_drawdown, calmar, final_multiple] of length n_boot.
"""
r = returns.values
n = len(r)
rng = np.random.default_rng(seed)
n_blocks = int(np.ceil(n / block_len))
# Pre-allocate
cagrs = np.empty(n_boot)
sharpes = np.empty(n_boot)
mdds = np.empty(n_boot)
finals = np.empty(n_boot)
span_years = n / 252.0
for b in range(n_boot):
starts = rng.integers(0, n - block_len + 1, size=n_blocks)
idx = (starts[:, None] + np.arange(block_len)[None, :]).ravel()[:n]
sample = r[idx]
equity = np.cumprod(1.0 + sample)
finals[b] = equity[-1]
cagrs[b] = equity[-1] ** (1.0 / span_years) - 1.0
std = sample.std(ddof=1)
sharpes[b] = (sample.mean() / std * np.sqrt(252)) if std > 0 else 0.0
running_max = np.maximum.accumulate(equity)
mdds[b] = float(np.min(equity / running_max - 1.0))
df = pd.DataFrame({
"cagr": cagrs,
"sharpe": sharpes,
"max_drawdown": mdds,
"final_multiple": finals,
})
df["calmar"] = df["cagr"] / df["max_drawdown"].abs().replace(0.0, np.nan)
return df
def bootstrap_summary(boot: pd.DataFrame) -> pd.DataFrame:
qs = [0.025, 0.05, 0.25, 0.50, 0.75, 0.95, 0.975]
summary = boot.quantile(qs).T
summary.columns = [f"p{int(q * 1000):04d}" for q in qs]
summary["mean"] = boot.mean()
summary["std"] = boot.std(ddof=1)
summary["prob_neg_cagr"] = np.nan
summary["prob_below_spy"] = np.nan
return summary
# ---------------------------------------------------------------------------
# P0.3 — De-leveraged comparison
# ---------------------------------------------------------------------------
def deleveraged_evaluations(
prices: pd.DataFrame, transaction_cost: float = 0.001
) -> dict[str, Evaluation]:
out: dict[str, Evaluation] = {}
# Standard (leveraged)
levered = TrendRiderV3()
w_lev = levered.generate_signals(prices)
out["TR_v3_leveraged"] = evaluate_weights(
"TR_v3_leveraged",
w_lev,
prices[w_lev.columns],
transaction_cost=transaction_cost,
start=IS_START,
end=OOS_END,
)
# No leverage on equity (risk_on = SPY/QQQ), commodity risk_off
nolev = TrendRiderV3(risk_on=("SPY", "QQQ"))
w_nl = nolev.generate_signals(prices)
out["TR_v3_nolev_SPYQQQ"] = evaluate_weights(
"TR_v3_nolev_SPYQQQ",
w_nl,
prices[w_nl.columns],
transaction_cost=transaction_cost,
start=IS_START,
end=OOS_END,
)
# No leverage AND cash-only risk_off (most conservative — pure timing edge on equity)
nolev_shy = TrendRiderV3(risk_on=("SPY", "QQQ"), risk_off=("SHY",))
w_nl_shy = nolev_shy.generate_signals(prices)
out["TR_v3_nolev_SHYoff"] = evaluate_weights(
"TR_v3_nolev_SHYoff",
w_nl_shy,
prices[w_nl_shy.columns],
transaction_cost=transaction_cost,
start=IS_START,
end=OOS_END,
)
# Buy-and-hold benchmarks
spy_w = buy_hold_weights(prices, "SPY")
qqq_w = buy_hold_weights(prices, "QQQ")
out["SPY_BH"] = evaluate_weights("SPY_BH", spy_w, prices[spy_w.columns], 0.0, IS_START, OOS_END)
out["QQQ_BH"] = evaluate_weights("QQQ_BH", qqq_w, prices[qqq_w.columns], 0.0, IS_START, OOS_END)
# 50/50 SPY+QQQ rebalanced (passive, no timing) — fairer "equity passive" benchmark
cols = [c for c in ["SPY", "QQQ"] if c in prices.columns]
if len(cols) == 2:
eq_w = pd.DataFrame(0.5, index=prices.index, columns=cols)
out["SPY_QQQ_5050"] = evaluate_weights(
"SPY_QQQ_5050", eq_w, prices[cols], 0.0, IS_START, OOS_END
)
return out
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="P0 validation suite for TrendRiderV3")
parser.add_argument("--n-boot", type=int, default=5000)
parser.add_argument("--block-len", type=int, default=21)
parser.add_argument("--transaction-cost", type=float, default=0.001)
parser.add_argument("--out-dir", default="data")
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
prices = load_price_panel()
print(f"Panel: {prices.index.min().date()} to {prices.index.max().date()}, "
f"{prices.shape[1]} columns")
# ---------- P0.1 ----------
print("\n" + "=" * 78)
print("P0.1 Walk-forward / Out-of-sample")
print(f" IS = {IS_START}{IS_END}")
print(f" OOS = {OOS_START}{OOS_END}")
print("=" * 78)
wf = walk_forward(prices, transaction_cost=args.transaction_cost)
is_grid = wf["is_grid"]
is_grid.to_csv(os.path.join(args.out_dir, "p0_walkforward_isgrid.csv"), index=False)
print(f"\nGrid size: {len(is_grid)} | top 3 by IS CAGR:")
cols_show = ["cagr", "sharpe", "max_drawdown", "vol_enter", "dd_stop", "peak_enter",
"mom_lookback", "regime_min_hold", "stop_loss_pct"]
print(is_grid[cols_show].head(3).to_string(index=False))
print(f"\nIS-best params: {wf['is_best_kwargs']}")
print(f" IS CAGR : {_fmt_pct(wf['is_best_IS_cagr'])}")
print(f" OOS perf of IS-best params:")
_print_eval("IS-best (OOS)", wf["is_best_OOS"])
_print_eval("Default (IS)", wf["default_IS"])
_print_eval("Default (OOS)", wf["default_OOS"])
_print_eval("SPY B&H (IS)", wf["spy_IS"])
_print_eval("SPY B&H (OOS)", wf["spy_OOS"])
_print_eval("QQQ B&H (IS)", wf["qqq_IS"])
_print_eval("QQQ B&H (OOS)", wf["qqq_OOS"])
decay = wf["is_best_IS_cagr"] - wf["is_best_OOS"].cagr
print(f"\n Performance decay (IS→OOS) of IS-best : {_fmt_pct(decay)}")
decay_def = wf["default_IS"].cagr - wf["default_OOS"].cagr
print(f" Performance decay (IS→OOS) of default : {_fmt_pct(decay_def)}")
# ---------- P0.2 ----------
print("\n" + "=" * 78)
print("P0.2 Block bootstrap (block_len="
f"{args.block_len}, n_boot={args.n_boot})")
print("=" * 78)
default = TrendRiderV3()
weights = default.generate_signals(prices)
rets = portfolio_returns(weights, prices[weights.columns],
transaction_cost=args.transaction_cost)
rets = rets[(rets.index >= IS_START) & (rets.index <= OOS_END)]
print(f" Returns series : {len(rets)} days, "
f"mean {rets.mean()*252:.4f}, vol {rets.std(ddof=1)*np.sqrt(252):.4f}")
boot_full = block_bootstrap(
rets, n_boot=args.n_boot, block_len=args.block_len, seed=42
)
boot_full.to_csv(os.path.join(args.out_dir, "p0_bootstrap_full.csv"), index=False)
print("\nFull-sample bootstrap (2015-2026):")
print(bootstrap_summary(boot_full).round(4).to_string())
# Probability statements
spy_oos_cagr = wf["spy_OOS"].cagr
p_below_spy = float((boot_full["cagr"] < spy_oos_cagr).mean())
p_neg = float((boot_full["cagr"] < 0).mean())
p_dd_50 = float((boot_full["max_drawdown"] < -0.50).mean())
p_sharpe_below_05 = float((boot_full["sharpe"] < 0.5).mean())
print(
f"\n P(CAGR<0) = {p_neg:.3f}\n"
f" P(CAGR<SPY OOS={spy_oos_cagr:.3f}) = {p_below_spy:.3f}\n"
f" P(MaxDD<-50%) = {p_dd_50:.3f}\n"
f" P(Sharpe<0.5) = {p_sharpe_below_05:.3f}"
)
# OOS-only bootstrap (the more honest "future" estimate)
rets_oos = rets[rets.index >= OOS_START]
boot_oos = block_bootstrap(
rets_oos, n_boot=args.n_boot, block_len=args.block_len, seed=43
)
print("\nOOS-only bootstrap (2021-2026):")
print(bootstrap_summary(boot_oos).round(4).to_string())
# ---------- P0.3 ----------
print("\n" + "=" * 78)
print("P0.3 De-leveraged comparison")
print("=" * 78)
de = deleveraged_evaluations(prices, transaction_cost=args.transaction_cost)
rows = []
for name, ev in de.items():
rows.append(asdict(ev))
_print_eval(name, ev)
pd.DataFrame(rows).to_csv(os.path.join(args.out_dir, "p0_deleveraged.csv"), index=False)
# Also break by IS / OOS
print("\n Same comparison, split IS vs OOS:")
for label, (start, end) in {"IS": (IS_START, IS_END), "OOS": (OOS_START, OOS_END)}.items():
print(f" --- {label} ({start}{end}) ---")
subs = {}
# Recompute on the slice
for nm, ctor in {
"TR_v3_leveraged": TrendRiderV3(),
"TR_v3_nolev_SPYQQQ": TrendRiderV3(risk_on=("SPY", "QQQ")),
"TR_v3_nolev_SHYoff": TrendRiderV3(risk_on=("SPY", "QQQ"), risk_off=("SHY",)),
}.items():
w = ctor.generate_signals(prices)
subs[nm] = evaluate_weights(
nm, w, prices[w.columns], args.transaction_cost, start, end
)
spy_w = buy_hold_weights(prices, "SPY")
qqq_w = buy_hold_weights(prices, "QQQ")
subs["SPY_BH"] = evaluate_weights("SPY_BH", spy_w, prices[spy_w.columns], 0.0, start, end)
subs["QQQ_BH"] = evaluate_weights("QQQ_BH", qqq_w, prices[qqq_w.columns], 0.0, start, end)
for nm, ev in subs.items():
_print_eval(nm, ev)
if __name__ == "__main__":
main()