Add 28 research scripts covering DCA simulation, momentum evaluation, Sharpe optimization, trend rider analysis, and US fundamentals exploration.
283 lines
12 KiB
Python
283 lines
12 KiB
Python
"""Evaluate the industry-neutral L/S momentum strategy with realistic costs.
|
||
|
||
Costs applied:
|
||
* gross slippage : 30 bps × turnover (long+short rebalances)
|
||
* borrow fee : 50 bps annualized × |short weight|, daily
|
||
* Optional dividend on short leg: 1.5% annualized × |short weight|, daily
|
||
|
||
Outputs metrics for the L/S strategy alone and blended with TrendRiderV5.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
from dataclasses import asdict
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from research.permanent_yearly import load_etfs, ETF_CACHE
|
||
from research.trend_rider_v6_eval import load_combined_panel
|
||
from research.trend_rider_robustness import (
|
||
buy_hold_weights,
|
||
evaluate_weights,
|
||
portfolio_returns,
|
||
)
|
||
from strategies.permanent import ETF_UNIVERSE
|
||
from strategies.trend_rider_v5 import TrendRiderV5
|
||
from strategies.ls_momentum import IndustryNeutralLSMomentum, fetch_sp500_sectors
|
||
from strategies.long_hedged import LongHedgedStock
|
||
|
||
|
||
IS_START = "2015-01-02"
|
||
IS_END = "2020-12-31"
|
||
OOS_START = "2021-01-01"
|
||
OOS_END = "2026-05-07"
|
||
|
||
|
||
def _fmt(x):
|
||
return f"{x*100:7.2f}%"
|
||
|
||
|
||
def ls_returns(weights: pd.DataFrame, prices: pd.DataFrame,
|
||
slippage_bps: float = 30.0,
|
||
borrow_bps_annual: float = 50.0,
|
||
div_short_bps_annual: float = 150.0) -> pd.Series:
|
||
"""Daily P&L net of slippage, borrow fee, and short-dividend pass-through.
|
||
|
||
weights : positive = long, negative = short.
|
||
"""
|
||
aligned = weights.reindex(index=prices.index, columns=prices.columns).fillna(0.0)
|
||
rets = prices.pct_change(fill_method=None).fillna(0.0)
|
||
gross = (rets * aligned).sum(axis=1)
|
||
|
||
turnover = aligned.diff().abs().sum(axis=1).fillna(0.0)
|
||
slip_cost = turnover * (slippage_bps / 10_000)
|
||
|
||
# Daily borrow cost on short leg (negative weights → positive |w|)
|
||
short_w = aligned.clip(upper=0.0).abs().sum(axis=1)
|
||
borrow_daily = (borrow_bps_annual + div_short_bps_annual) / 10_000 / 252
|
||
short_cost = short_w * borrow_daily
|
||
|
||
return gross - slip_cost - short_cost
|
||
|
||
|
||
def evaluate_ls(label: str, weights: pd.DataFrame, prices: pd.DataFrame,
|
||
start: str, end: str,
|
||
slippage_bps: float = 30.0,
|
||
borrow_bps_annual: float = 50.0,
|
||
div_short_bps_annual: float = 150.0):
|
||
"""Custom evaluator that handles negative weights and L/S costs."""
|
||
rets = ls_returns(weights, prices, slippage_bps, borrow_bps_annual,
|
||
div_short_bps_annual)
|
||
rets = rets[(rets.index >= start) & (rets.index <= end)]
|
||
if rets.empty:
|
||
return None
|
||
eq = (1 + rets).cumprod()
|
||
span = max((rets.index[-1] - rets.index[0]).days / 365.25, 1 / 252)
|
||
cagr = float(eq.iloc[-1] ** (1 / span) - 1)
|
||
vol = float(rets.std(ddof=1) * np.sqrt(252))
|
||
sharpe = float(rets.mean() / rets.std(ddof=1) * np.sqrt(252)) if rets.std(ddof=1) > 0 else 0.0
|
||
dd = eq / eq.cummax() - 1
|
||
mdd = float(dd.min())
|
||
aligned = weights.reindex(index=prices.index, columns=prices.columns).fillna(0.0)
|
||
aligned = aligned.loc[(aligned.index >= start) & (aligned.index <= end)]
|
||
turn = aligned.diff().abs().sum(axis=1).fillna(0.0)
|
||
long_w = aligned.clip(lower=0.0).sum(axis=1)
|
||
short_w = aligned.clip(upper=0.0).abs().sum(axis=1)
|
||
# Construct an Evaluation-like dict
|
||
return {
|
||
"label": label,
|
||
"start": str(rets.index[0].date()),
|
||
"end": str(rets.index[-1].date()),
|
||
"days": int(len(rets)),
|
||
"cagr": cagr,
|
||
"volatility": vol,
|
||
"sharpe": sharpe,
|
||
"max_drawdown": mdd,
|
||
"calmar": float(cagr / abs(mdd)) if mdd < 0 else 0.0,
|
||
"final_multiple": float(eq.iloc[-1]),
|
||
"switches": int((turn > 0.01).sum()),
|
||
"avg_daily_turnover": float(turn.mean()),
|
||
"avg_long": float(long_w.mean()),
|
||
"avg_short": float(short_w.mean()),
|
||
"rets": rets,
|
||
}
|
||
|
||
|
||
def print_eval(d: dict, prefix: str = "") -> None:
|
||
print(
|
||
f" {prefix}{d['label']:<32s} "
|
||
f"CAGR {_fmt(d['cagr'])} Vol {_fmt(d['volatility'])} "
|
||
f"Sharpe {d['sharpe']:5.2f} MDD {_fmt(d['max_drawdown'])} "
|
||
f"Calmar {d['calmar']:5.2f} X {d['final_multiple']:6.2f} "
|
||
f"L {d['avg_long']*100:5.1f}% S {d['avg_short']*100:5.1f}%"
|
||
)
|
||
|
||
|
||
def annual_returns(rets: pd.Series) -> pd.Series:
|
||
return (1.0 + rets).groupby(rets.index.year).prod() - 1.0
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--slippage-bps", type=float, default=30.0)
|
||
parser.add_argument("--borrow-bps", type=float, default=15.0)
|
||
# auto_adjust=True yfinance already includes dividends; do not double-count
|
||
parser.add_argument("--div-short-bps", type=float, default=0.0)
|
||
parser.add_argument("--out-dir", default="data")
|
||
args = parser.parse_args()
|
||
|
||
panel = load_combined_panel()
|
||
etf_set = (set(ETF_UNIVERSE)
|
||
| {"QQQ", "TQQQ", "UPRO", "GLD", "DBC", "SHY", "SPY",
|
||
"YINN", "CHAU", "7200.HK", "7500.HK"})
|
||
stock_universe = [c for c in panel.columns if c not in etf_set]
|
||
print(f"Stock universe: {len(stock_universe)} names")
|
||
|
||
sector_df = fetch_sp500_sectors()
|
||
sector_map = sector_df["GICS Sector"]
|
||
coverage = sector_map.reindex(stock_universe).notna().sum()
|
||
print(f"Sector coverage: {coverage} / {len(stock_universe)}")
|
||
|
||
# ---------- #1 + #2: smaller top_n + regime gate ----------
|
||
candidates = {
|
||
# Baseline from prior run
|
||
"Hedged top10 hr1.0 (baseline)": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=10,
|
||
hedge_ratio=1.0, stock_universe=stock_universe),
|
||
# #1 — concentrated long leg
|
||
"Hedged top5 hr1.0": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=5,
|
||
hedge_ratio=1.0, stock_universe=stock_universe),
|
||
"Hedged top7 hr1.0": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=7,
|
||
hedge_ratio=1.0, stock_universe=stock_universe),
|
||
# #2 — regime gate (only on when SPY > MA200)
|
||
"Hedged top10 hr1.0 +regime": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=10,
|
||
hedge_ratio=1.0, regime_gate=True,
|
||
stock_universe=stock_universe),
|
||
# #1 + #2 combined
|
||
"Hedged top5 hr1.0 +regime": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=5,
|
||
hedge_ratio=1.0, regime_gate=True,
|
||
stock_universe=stock_universe),
|
||
"Hedged top7 hr1.0 +regime": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=7,
|
||
hedge_ratio=1.0, regime_gate=True,
|
||
stock_universe=stock_universe),
|
||
# Smaller top_n with partial hedge
|
||
"Hedged top5 hr0.7 +regime": LongHedgedStock(
|
||
signal_name="rec_mfilt+deep_upvol", top_n=5,
|
||
hedge_ratio=0.7, regime_gate=True,
|
||
stock_universe=stock_universe),
|
||
}
|
||
|
||
weights_map = {}
|
||
print("\n=== Generating signals ===")
|
||
for name, strat in candidates.items():
|
||
print(f" ... {name}")
|
||
# LongHedgedStock needs the full panel (stocks + SPY); IndustryNeutral
|
||
# only needs stocks. Generate on appropriate slice.
|
||
if isinstance(strat, LongHedgedStock):
|
||
weights_map[name] = strat.generate_signals(panel)
|
||
else:
|
||
weights_map[name] = strat.generate_signals(panel[stock_universe])
|
||
|
||
print(f"\n=== L/S alone (slippage={args.slippage_bps}bps, "
|
||
f"borrow={args.borrow_bps}bps, div_short={args.div_short_bps}bps) ===")
|
||
print(f"\n --- FULL (2015 → 2026-05) ---")
|
||
rets_map = {}
|
||
for name, w in weights_map.items():
|
||
# Re-attach to full panel
|
||
w_full = w.reindex(columns=panel.columns).fillna(0.0)
|
||
d = evaluate_ls(name, w_full, panel, IS_START, OOS_END,
|
||
args.slippage_bps, args.borrow_bps, args.div_short_bps)
|
||
rets_map[name] = d["rets"]
|
||
print_eval(d)
|
||
|
||
print(f"\n --- IS (2015 → 2020) ---")
|
||
for name, w in weights_map.items():
|
||
w_full = w.reindex(columns=panel.columns).fillna(0.0)
|
||
d = evaluate_ls(name, w_full, panel, IS_START, IS_END,
|
||
args.slippage_bps, args.borrow_bps, args.div_short_bps)
|
||
print_eval(d)
|
||
|
||
print(f"\n --- OOS (2021 → 2026-05) ---")
|
||
for name, w in weights_map.items():
|
||
w_full = w.reindex(columns=panel.columns).fillna(0.0)
|
||
d = evaluate_ls(name, w_full, panel, OOS_START, OOS_END,
|
||
args.slippage_bps, args.borrow_bps, args.div_short_bps)
|
||
print_eval(d)
|
||
|
||
# ---------- V5 baseline returns ----------
|
||
print("\n=== V5 baseline (for blending) ===")
|
||
v5 = TrendRiderV5()
|
||
v5_w = v5.generate_signals(panel)
|
||
v5_rets = portfolio_returns(v5_w, panel[v5_w.columns], 0.001)
|
||
|
||
# Pick best L/S by full-period Sharpe
|
||
best_ls = max(rets_map.keys(),
|
||
key=lambda k: rets_map[k][(rets_map[k].index >= IS_START)
|
||
& (rets_map[k].index <= OOS_END)]
|
||
.pipe(lambda r: r.mean() / r.std(ddof=1) * np.sqrt(252)
|
||
if r.std(ddof=1) > 0 else 0))
|
||
print(f"\n Best L/S by full-period Sharpe : {best_ls}")
|
||
best_ls_rets = rets_map[best_ls]
|
||
|
||
# ---------- Correlation ----------
|
||
common = v5_rets.index.intersection(best_ls_rets.index)
|
||
common = common[(common >= pd.Timestamp(IS_START)) & (common <= pd.Timestamp(OOS_END))]
|
||
v5r, lsr = v5_rets.loc[common], best_ls_rets.loc[common]
|
||
corr_full = v5r.corr(lsr)
|
||
is_mask = (common >= pd.Timestamp(IS_START)) & (common <= pd.Timestamp(IS_END))
|
||
oos_mask = (common >= pd.Timestamp(OOS_START)) & (common <= pd.Timestamp(OOS_END))
|
||
corr_is = v5r[is_mask].corr(lsr[is_mask])
|
||
corr_oos = v5r[oos_mask].corr(lsr[oos_mask])
|
||
print(f" V5 vs {best_ls} correlations:")
|
||
print(f" FULL : {corr_full:6.3f}")
|
||
print(f" IS : {corr_is:6.3f}")
|
||
print(f" OOS : {corr_oos:6.3f}")
|
||
|
||
# ---------- Blends ----------
|
||
print(f"\n=== V5 + L/S blends (rets-level) ===")
|
||
print(f" Window Mix CAGR Vol Sharpe MDD Calmar")
|
||
for w5, wls in [(0.50, 0.50), (0.70, 0.30), (0.80, 0.20),
|
||
(0.60, 0.40), (0.40, 0.60)]:
|
||
for window_name, (s, e) in {"FULL": (IS_START, OOS_END),
|
||
"IS": (IS_START, IS_END),
|
||
"OOS": (OOS_START, OOS_END)}.items():
|
||
mask = (common >= pd.Timestamp(s)) & (common <= pd.Timestamp(e))
|
||
r = w5 * v5r[mask] + wls * lsr[mask]
|
||
if r.empty:
|
||
continue
|
||
eq = (1 + r).cumprod()
|
||
span = max((r.index[-1] - r.index[0]).days / 365.25, 1 / 252)
|
||
cagr = eq.iloc[-1] ** (1 / span) - 1
|
||
vol = r.std(ddof=1) * np.sqrt(252)
|
||
sharpe = r.mean() / r.std(ddof=1) * np.sqrt(252) if r.std(ddof=1) > 0 else 0
|
||
mdd = float((eq / eq.cummax() - 1).min())
|
||
calmar = cagr / abs(mdd) if mdd < 0 else 0
|
||
print(f" [{window_name:<4s}] V5={w5:.0%}+LS={wls:.0%} "
|
||
f"{cagr*100:6.2f}% {vol*100:5.2f}% {sharpe:5.2f} "
|
||
f"{mdd*100:6.2f}% {calmar:5.2f}")
|
||
print()
|
||
|
||
# ---------- Annual returns ----------
|
||
print("\n=== Annual returns (best L/S vs V5) ===")
|
||
a_v5 = annual_returns(v5r).rename("V5")
|
||
a_ls = annual_returns(lsr).rename(best_ls)
|
||
a_blend50 = annual_returns(0.5 * v5r + 0.5 * lsr).rename("Blend 50/50")
|
||
a_blend70 = annual_returns(0.7 * v5r + 0.3 * lsr).rename("Blend 70/30 V5/LS")
|
||
annuals = pd.concat([a_v5, a_ls, a_blend50, a_blend70], axis=1)
|
||
annuals = annuals.map(lambda x: f"{x*100:7.1f}%" if pd.notna(x) else "")
|
||
print(annuals.to_string())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|