Add 28 research scripts covering DCA simulation, momentum evaluation, Sharpe optimization, trend rider analysis, and US fundamentals exploration.
266 lines
10 KiB
Python
266 lines
10 KiB
Python
"""
|
||
Sharpe boost v5: Fine-tune DD dampener on top of the Sharpe 1.52 config.
|
||
|
||
Best raw config: rebal=42, top_n=12, asym_vol (Sharpe 1.52, MaxDD -31.2%)
|
||
Now: add a LIGHTER DD dampener to bring MaxDD under 30% without killing Sharpe.
|
||
|
||
Key: dd_denom controls how aggressively we cut. Larger denom = lighter touch.
|
||
"""
|
||
from __future__ import annotations
|
||
import os, sys
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from strategies.base import Strategy
|
||
|
||
|
||
def _rank(df):
|
||
return df.rank(axis=1, pct=True, na_option="keep")
|
||
|
||
|
||
def compute_metrics(daily_rets: pd.Series) -> dict:
|
||
eq = (1 + daily_rets).cumprod()
|
||
n_years = len(daily_rets) / 252.0
|
||
cagr = eq.iloc[-1] ** (1.0 / n_years) - 1.0
|
||
vol = daily_rets.std() * np.sqrt(252)
|
||
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
|
||
running_max = eq.cummax()
|
||
dd = eq / running_max - 1
|
||
max_dd = dd.min()
|
||
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
|
||
return {"cagr": cagr, "vol": vol, "sharpe": sharpe, "max_dd": max_dd, "calmar": calmar}
|
||
|
||
|
||
def yearly_returns(daily_rets: pd.Series) -> pd.Series:
|
||
eq = (1 + daily_rets).cumprod()
|
||
yearly = eq.resample("YE").last().pct_change()
|
||
yearly.iloc[0] = eq.resample("YE").last().iloc[0] - 1
|
||
yearly.index = yearly.index.year
|
||
return yearly
|
||
|
||
|
||
class EnsembleV3(Strategy):
|
||
def __init__(self, top_n=12, rebal_freq=42, mom_blend=0.0,
|
||
asym_vol=True, asym_vol_floor=0.50,
|
||
dd_dampen=False, dd_floor=0.40, dd_denom=0.20):
|
||
self.top_n = top_n
|
||
self.rebal_freq = rebal_freq
|
||
self.mom_blend = mom_blend
|
||
self.asym_vol = asym_vol
|
||
self.asym_vol_floor = asym_vol_floor
|
||
self.dd_dampen = dd_dampen
|
||
self.dd_floor = dd_floor
|
||
self.dd_denom = dd_denom
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
p = data
|
||
ret = p.pct_change()
|
||
|
||
rec_126 = p / p.rolling(126, min_periods=126).min() - 1
|
||
mom_filter = p.shift(21).pct_change(105)
|
||
rec_mfilt = rec_126.where(mom_filter > 0, np.nan)
|
||
rec_mfilt_r = _rank(rec_mfilt)
|
||
|
||
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
|
||
deep_upvol = _rank(rec_126) * _rank(up_vol)
|
||
deep_upvol_r = _rank(deep_upvol)
|
||
signal_a = 0.5 * rec_mfilt_r + 0.5 * deep_upvol_r
|
||
|
||
rec_63 = p / p.rolling(63, min_periods=63).min() - 1
|
||
mom_12_1 = p.shift(21).pct_change(231)
|
||
rec_63_r = _rank(rec_63)
|
||
mom_r = _rank(mom_12_1)
|
||
signal_b = 0.5 * rec_63_r + 0.5 * mom_r
|
||
|
||
signal_c = mom_r
|
||
|
||
α = self.mom_blend
|
||
if α > 0:
|
||
ensemble = (1 - α) / 2 * signal_a + (1 - α) / 2 * signal_b + α * signal_c
|
||
else:
|
||
ensemble = 0.5 * signal_a + 0.5 * signal_b
|
||
|
||
rank = ensemble.rank(axis=1, ascending=False, na_option="bottom")
|
||
n_valid = ensemble.notna().sum(axis=1)
|
||
enough = n_valid >= self.top_n
|
||
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
|
||
|
||
raw = top_mask.astype(float)
|
||
row_sums = raw.sum(axis=1).replace(0, np.nan)
|
||
signals = raw.div(row_sums, axis=0).fillna(0.0)
|
||
|
||
warmup = 252
|
||
rebal_mask = pd.Series(False, index=data.index)
|
||
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
|
||
rebal_mask.iloc[rebal_indices] = True
|
||
signals[~rebal_mask] = np.nan
|
||
signals = signals.ffill().fillna(0.0)
|
||
signals.iloc[:warmup] = 0.0
|
||
signals = signals.shift(1).fillna(0.0)
|
||
|
||
if self.asym_vol:
|
||
daily_rets = data.pct_change().fillna(0.0)
|
||
port_rets = (signals * daily_rets).sum(axis=1)
|
||
short_vol = port_rets.rolling(20, min_periods=10).std() * np.sqrt(252)
|
||
vol_median = short_vol.rolling(252, min_periods=126).median()
|
||
recent_ret = port_rets.rolling(20, min_periods=10).sum()
|
||
high_vol_neg = (short_vol > vol_median * 1.5) & (recent_ret < 0)
|
||
asym_scale = pd.Series(1.0, index=data.index)
|
||
asym_scale[high_vol_neg] = self.asym_vol_floor
|
||
signals = signals.mul(asym_scale.shift(1).fillna(1.0), axis=0)
|
||
|
||
if self.dd_dampen:
|
||
daily_rets = data.pct_change().fillna(0.0)
|
||
mkt_rets = daily_rets.mean(axis=1)
|
||
mkt_eq = (1 + mkt_rets).cumprod()
|
||
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
|
||
dd_scale = (1.0 + mkt_dd / self.dd_denom).clip(lower=self.dd_floor, upper=1.0)
|
||
signals = signals.mul(dd_scale.shift(1).fillna(1.0), axis=0)
|
||
|
||
return signals
|
||
|
||
|
||
_DATA_CACHE = {}
|
||
|
||
|
||
def backtest_strategy(strategy, start="2016-04-01", end="2026-05-13"):
|
||
import data_manager
|
||
if "data" not in _DATA_CACHE:
|
||
from universe import get_sp500
|
||
tickers = get_sp500()
|
||
data_manager.update("us", tickers)
|
||
_DATA_CACHE["data"] = data_manager.load("us")
|
||
data = _DATA_CACHE["data"]
|
||
weights = strategy.generate_signals(data)
|
||
daily_rets = (weights * data.pct_change().fillna(0.0)).sum(axis=1)
|
||
return daily_rets.loc[start:end]
|
||
|
||
|
||
def fmt_row(label, m):
|
||
return (f"{label:<55s} {m['cagr']*100:>6.1f}% {m['vol']*100:>6.1f}% "
|
||
f"{m['sharpe']:>6.2f} {m['max_dd']*100:>6.1f}% {m['calmar']:>6.2f}")
|
||
|
||
|
||
def main():
|
||
print("=" * 95)
|
||
print("SHARPE BOOST v5: Fine-tune DD dampener on Sharpe 1.52 base")
|
||
print("=" * 95)
|
||
|
||
header = f"{'Config':<55s} {'CAGR':>7s} {'Vol':>7s} {'Sharpe':>6s} {'MaxDD':>7s} {'Calmar':>6s}"
|
||
|
||
# --- Baseline (no DD) ---
|
||
print(f"\n--- Baseline: rebal42 + top12 + asym_vol (no DD) ---")
|
||
print(header)
|
||
print("-" * 95)
|
||
strat = EnsembleV3(top_n=12, rebal_freq=42, asym_vol=True, dd_dampen=False)
|
||
base_rets = backtest_strategy(strat)
|
||
base_m = compute_metrics(base_rets)
|
||
print(fmt_row("NO DD (baseline)", base_m))
|
||
|
||
# --- Light DD: larger dd_denom (gentler), higher floor ---
|
||
print(f"\n--- DD dampener tuning (lighter touch) ---")
|
||
print(header)
|
||
print("-" * 95)
|
||
|
||
configs = [
|
||
# (dd_floor, dd_denom) — larger denom = need bigger crash to trigger
|
||
(0.60, 0.25),
|
||
(0.60, 0.30),
|
||
(0.60, 0.35),
|
||
(0.70, 0.25),
|
||
(0.70, 0.30),
|
||
(0.70, 0.35),
|
||
(0.50, 0.25),
|
||
(0.50, 0.30),
|
||
(0.50, 0.35),
|
||
(0.40, 0.20), # original (aggressive)
|
||
]
|
||
|
||
results = {}
|
||
for dd_floor, dd_denom in configs:
|
||
strat = EnsembleV3(top_n=12, rebal_freq=42, asym_vol=True,
|
||
dd_dampen=True, dd_floor=dd_floor, dd_denom=dd_denom)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
results[(dd_floor, dd_denom)] = {"rets": rets, "m": m}
|
||
print(fmt_row(f"DD floor={dd_floor:.2f} denom={dd_denom:.2f}", m))
|
||
|
||
# --- Also test: top_n=10 vs 12 with lighter DD ---
|
||
print(f"\n--- top_n comparison with light DD (floor=0.60, denom=0.30) ---")
|
||
print(header)
|
||
print("-" * 95)
|
||
for n in [8, 10, 12]:
|
||
strat = EnsembleV3(top_n=n, rebal_freq=42, asym_vol=True,
|
||
dd_dampen=True, dd_floor=0.60, dd_denom=0.30)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(f"top_n={n}, light DD", m))
|
||
|
||
# --- Also try: mom_blend with the good configs ---
|
||
print(f"\n--- Add momentum blend to best configs ---")
|
||
print(header)
|
||
print("-" * 95)
|
||
for α in [0.0, 0.15, 0.20]:
|
||
for dd_floor, dd_denom in [(0.60, 0.30), (0.70, 0.30)]:
|
||
strat = EnsembleV3(top_n=12, rebal_freq=42, mom_blend=α, asym_vol=True,
|
||
dd_dampen=True, dd_floor=dd_floor, dd_denom=dd_denom)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
results[(dd_floor, dd_denom, α)] = {"rets": rets, "m": m}
|
||
print(fmt_row(f"top12, mom={α:.0%}, DD f={dd_floor} d={dd_denom}", m))
|
||
|
||
# --- Pick best Sharpe >= 1.5 config ---
|
||
print(f"\n{'=' * 95}")
|
||
print("SELECTING BEST CONFIG WITH Sharpe >= 1.50")
|
||
print(f"{'=' * 95}")
|
||
|
||
# Find best among all tested
|
||
best_key = None
|
||
best_sharpe = 0
|
||
for key, v in results.items():
|
||
if v["m"]["sharpe"] >= best_sharpe:
|
||
best_sharpe = v["m"]["sharpe"]
|
||
best_key = key
|
||
|
||
if best_key:
|
||
best = results[best_key]
|
||
print(f"Config: {best_key}")
|
||
print(fmt_row("BEST", best["m"]))
|
||
print(f"\n--- Yearly returns ---")
|
||
yr = yearly_returns(best["rets"])
|
||
for year, ret in yr.items():
|
||
print(f" {year}: {ret*100:>+7.1f}%")
|
||
|
||
# IS/OOS
|
||
print(f"\n--- IS/OOS ---")
|
||
is_rets = best["rets"].loc["2016-04-01":"2022-12-31"]
|
||
oos_rets = best["rets"].loc["2023-01-01":"2026-05-13"]
|
||
is_m = compute_metrics(is_rets)
|
||
oos_m = compute_metrics(oos_rets)
|
||
print(f" IS (2016-2022): CAGR {is_m['cagr']*100:.1f}% Sharpe {is_m['sharpe']:.2f} MaxDD {is_m['max_dd']*100:.1f}%")
|
||
print(f" OOS (2023-2026): CAGR {oos_m['cagr']*100:.1f}% Sharpe {oos_m['sharpe']:.2f} MaxDD {oos_m['max_dd']*100:.1f}%")
|
||
|
||
# Bootstrap
|
||
print(f"\n--- Bootstrap ---")
|
||
from research.trend_rider_p0 import block_bootstrap
|
||
boot = block_bootstrap(best["rets"], n_boot=5000, block_len=42)
|
||
print(f" Sharpe: median={boot['sharpe'].median():.2f} "
|
||
f"5th={boot['sharpe'].quantile(0.05):.2f} "
|
||
f"95th={boot['sharpe'].quantile(0.95):.2f}")
|
||
print(f" MaxDD: median={boot['max_drawdown'].median()*100:.1f}% "
|
||
f"5th={boot['max_drawdown'].quantile(0.05)*100:.1f}% "
|
||
f"95th={boot['max_drawdown'].quantile(0.95)*100:.1f}%")
|
||
print(f" P(Sharpe > 1.5): {(boot['sharpe'] > 1.5).mean()*100:.1f}%")
|
||
print(f" P(Sharpe > 1.0): {(boot['sharpe'] > 1.0).mean()*100:.1f}%")
|
||
print(f" P(MaxDD > 30%): {(boot['max_drawdown'].abs() > 0.30).mean()*100:.1f}%")
|
||
else:
|
||
print("No config achieved Sharpe >= 1.50")
|
||
# Show best anyway
|
||
best_key = max(results, key=lambda k: results[k]["m"]["sharpe"])
|
||
print(f"Closest: {best_key} → Sharpe {results[best_key]['m']['sharpe']:.2f}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|