Add 28 research scripts covering DCA simulation, momentum evaluation, Sharpe optimization, trend rider analysis, and US fundamentals exploration.
279 lines
11 KiB
Python
279 lines
11 KiB
Python
"""
|
||
Sharpe boost v4: Long holding period (42d rebal) is the key lever.
|
||
|
||
Key finding from v3: rebal=42d → Sharpe 1.42 (vs 1.34 for 21d)
|
||
Why: Monthly rebal causes turnover-induced noise. Recovery/momentum signals
|
||
are slow-moving (126d lookback) so weekly/biweekly rebal is too fast.
|
||
42d rebal lets winners run.
|
||
|
||
Now test: rebal=42d + concentration + mom_blend + asym_vol + DD dampener
|
||
"""
|
||
from __future__ import annotations
|
||
import os, sys
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from strategies.base import Strategy
|
||
|
||
|
||
def _rank(df):
|
||
return df.rank(axis=1, pct=True, na_option="keep")
|
||
|
||
|
||
def compute_metrics(daily_rets: pd.Series) -> dict:
|
||
eq = (1 + daily_rets).cumprod()
|
||
n_years = len(daily_rets) / 252.0
|
||
cagr = eq.iloc[-1] ** (1.0 / n_years) - 1.0
|
||
vol = daily_rets.std() * np.sqrt(252)
|
||
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
|
||
running_max = eq.cummax()
|
||
dd = eq / running_max - 1
|
||
max_dd = dd.min()
|
||
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
|
||
return {"cagr": cagr, "vol": vol, "sharpe": sharpe, "max_dd": max_dd, "calmar": calmar}
|
||
|
||
|
||
def yearly_returns(daily_rets: pd.Series) -> pd.Series:
|
||
eq = (1 + daily_rets).cumprod()
|
||
yearly = eq.resample("YE").last().pct_change()
|
||
yearly.iloc[0] = eq.resample("YE").last().iloc[0] - 1
|
||
yearly.index = yearly.index.year
|
||
return yearly
|
||
|
||
|
||
class EnsembleV3(Strategy):
|
||
"""Ensemble with all levers: rebal, concentration, mom, risk mgmt."""
|
||
|
||
def __init__(self, top_n=10, rebal_freq=42, mom_blend=0.0,
|
||
asym_vol=False, asym_vol_floor=0.50,
|
||
dd_dampen=False, dd_floor=0.40, dd_denom=0.20):
|
||
self.top_n = top_n
|
||
self.rebal_freq = rebal_freq
|
||
self.mom_blend = mom_blend
|
||
self.asym_vol = asym_vol
|
||
self.asym_vol_floor = asym_vol_floor
|
||
self.dd_dampen = dd_dampen
|
||
self.dd_floor = dd_floor
|
||
self.dd_denom = dd_denom
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
p = data
|
||
ret = p.pct_change()
|
||
|
||
# === Signal A: rec_mfilt + deep_upvol ===
|
||
rec_126 = p / p.rolling(126, min_periods=126).min() - 1
|
||
mom_filter = p.shift(21).pct_change(105)
|
||
rec_mfilt = rec_126.where(mom_filter > 0, np.nan)
|
||
rec_mfilt_r = _rank(rec_mfilt)
|
||
|
||
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
|
||
deep_upvol = _rank(rec_126) * _rank(up_vol)
|
||
deep_upvol_r = _rank(deep_upvol)
|
||
signal_a = 0.5 * rec_mfilt_r + 0.5 * deep_upvol_r
|
||
|
||
# === Signal B: Recovery 63d + 12-1 momentum ===
|
||
rec_63 = p / p.rolling(63, min_periods=63).min() - 1
|
||
mom_12_1 = p.shift(21).pct_change(231)
|
||
rec_63_r = _rank(rec_63)
|
||
mom_r = _rank(mom_12_1)
|
||
signal_b = 0.5 * rec_63_r + 0.5 * mom_r
|
||
|
||
# === Signal C: Pure momentum ===
|
||
signal_c = mom_r
|
||
|
||
# === Ensemble ===
|
||
α = self.mom_blend
|
||
if α > 0:
|
||
ensemble = (1 - α) / 2 * signal_a + (1 - α) / 2 * signal_b + α * signal_c
|
||
else:
|
||
ensemble = 0.5 * signal_a + 0.5 * signal_b
|
||
|
||
# === Select top_n ===
|
||
rank = ensemble.rank(axis=1, ascending=False, na_option="bottom")
|
||
n_valid = ensemble.notna().sum(axis=1)
|
||
enough = n_valid >= self.top_n
|
||
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
|
||
|
||
raw = top_mask.astype(float)
|
||
row_sums = raw.sum(axis=1).replace(0, np.nan)
|
||
signals = raw.div(row_sums, axis=0).fillna(0.0)
|
||
|
||
# === Rebalance ===
|
||
warmup = 252
|
||
rebal_mask = pd.Series(False, index=data.index)
|
||
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
|
||
rebal_mask.iloc[rebal_indices] = True
|
||
signals[~rebal_mask] = np.nan
|
||
signals = signals.ffill().fillna(0.0)
|
||
signals.iloc[:warmup] = 0.0
|
||
signals = signals.shift(1).fillna(0.0) # PIT
|
||
|
||
# === Asymmetric vol: only cut in high-vol + negative return ===
|
||
if self.asym_vol:
|
||
daily_rets = data.pct_change().fillna(0.0)
|
||
port_rets = (signals * daily_rets).sum(axis=1)
|
||
short_vol = port_rets.rolling(20, min_periods=10).std() * np.sqrt(252)
|
||
vol_median = short_vol.rolling(252, min_periods=126).median()
|
||
recent_ret = port_rets.rolling(20, min_periods=10).sum()
|
||
high_vol_neg = (short_vol > vol_median * 1.5) & (recent_ret < 0)
|
||
asym_scale = pd.Series(1.0, index=data.index)
|
||
asym_scale[high_vol_neg] = self.asym_vol_floor
|
||
signals = signals.mul(asym_scale.shift(1).fillna(1.0), axis=0)
|
||
|
||
# === Market DD dampener ===
|
||
if self.dd_dampen:
|
||
daily_rets = data.pct_change().fillna(0.0)
|
||
mkt_rets = daily_rets.mean(axis=1)
|
||
mkt_eq = (1 + mkt_rets).cumprod()
|
||
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
|
||
dd_scale = (1.0 + mkt_dd / self.dd_denom).clip(lower=self.dd_floor, upper=1.0)
|
||
signals = signals.mul(dd_scale.shift(1).fillna(1.0), axis=0)
|
||
|
||
return signals
|
||
|
||
|
||
_DATA_CACHE = {}
|
||
|
||
|
||
def backtest_strategy(strategy, start="2016-04-01", end="2026-05-13"):
|
||
import data_manager
|
||
if "data" not in _DATA_CACHE:
|
||
from universe import get_sp500
|
||
tickers = get_sp500()
|
||
data_manager.update("us", tickers)
|
||
_DATA_CACHE["data"] = data_manager.load("us")
|
||
data = _DATA_CACHE["data"]
|
||
weights = strategy.generate_signals(data)
|
||
daily_rets = (weights * data.pct_change().fillna(0.0)).sum(axis=1)
|
||
return daily_rets.loc[start:end]
|
||
|
||
|
||
def fmt_row(label, m):
|
||
return (f"{label:<50s} {m['cagr']*100:>6.1f}% {m['vol']*100:>6.1f}% "
|
||
f"{m['sharpe']:>6.2f} {m['max_dd']*100:>6.1f}% {m['calmar']:>6.2f}")
|
||
|
||
|
||
def main():
|
||
print("=" * 90)
|
||
print("SHARPE BOOST v4: rebal=42d as key lever + combos")
|
||
print("=" * 90)
|
||
|
||
header = f"{'Config':<50s} {'CAGR':>7s} {'Vol':>7s} {'Sharpe':>6s} {'MaxDD':>7s} {'Calmar':>6s}"
|
||
|
||
# --- rebal=42d sweep ---
|
||
print(f"\n--- rebal=42d + concentration sweep ---")
|
||
print(header)
|
||
print("-" * 90)
|
||
for n in [6, 8, 10, 12]:
|
||
strat = EnsembleV3(top_n=n, rebal_freq=42)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(f"rebal=42, top_n={n}", m))
|
||
|
||
# --- rebal=42d + momentum blend ---
|
||
print(f"\n--- rebal=42d + momentum blend ---")
|
||
print(header)
|
||
print("-" * 90)
|
||
for α in [0.0, 0.15, 0.20, 0.25, 0.30]:
|
||
strat = EnsembleV3(top_n=10, rebal_freq=42, mom_blend=α)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(f"rebal=42, top10, mom={α:.0%}", m))
|
||
|
||
# --- rebal sweep around 42d ---
|
||
print(f"\n--- rebal frequency fine-tuning (top_n=10) ---")
|
||
print(header)
|
||
print("-" * 90)
|
||
for freq in [30, 35, 42, 50, 63]:
|
||
strat = EnsembleV3(top_n=10, rebal_freq=freq)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(f"rebal={freq}d, top10", m))
|
||
|
||
# --- Best rebal + DD dampener ---
|
||
print(f"\n--- rebal=42d + DD dampener ---")
|
||
print(header)
|
||
print("-" * 90)
|
||
for n in [10, 12]:
|
||
for α in [0.0, 0.20]:
|
||
strat = EnsembleV3(top_n=n, rebal_freq=42, mom_blend=α, dd_dampen=True)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(f"rebal=42, top{n}, mom={α:.0%}, DD", m))
|
||
|
||
# --- Best rebal + asym vol ---
|
||
print(f"\n--- rebal=42d + asym_vol ---")
|
||
print(header)
|
||
print("-" * 90)
|
||
for n in [10, 12]:
|
||
strat = EnsembleV3(top_n=n, rebal_freq=42, asym_vol=True, asym_vol_floor=0.50)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(f"rebal=42, top{n}, asym_vol", m))
|
||
|
||
# --- Full combo ---
|
||
print(f"\n--- FULL COMBOS ---")
|
||
print(header)
|
||
print("-" * 90)
|
||
combos = [
|
||
("rebal42 + top10 + asym_vol + DD", dict(top_n=10, rebal_freq=42, asym_vol=True, dd_dampen=True)),
|
||
("rebal42 + top10 + mom20% + asym_vol + DD", dict(top_n=10, rebal_freq=42, mom_blend=0.20, asym_vol=True, dd_dampen=True)),
|
||
("rebal42 + top12 + asym_vol + DD", dict(top_n=12, rebal_freq=42, asym_vol=True, dd_dampen=True)),
|
||
("rebal42 + top12 + mom20% + asym_vol + DD", dict(top_n=12, rebal_freq=42, mom_blend=0.20, asym_vol=True, dd_dampen=True)),
|
||
("rebal63 + top10 + asym_vol + DD", dict(top_n=10, rebal_freq=63, asym_vol=True, dd_dampen=True)),
|
||
("rebal63 + top12 + asym_vol + DD", dict(top_n=12, rebal_freq=63, asym_vol=True, dd_dampen=True)),
|
||
]
|
||
|
||
best_sharpe = 0
|
||
best_label = ""
|
||
best_rets = None
|
||
for label, kwargs in combos:
|
||
strat = EnsembleV3(**kwargs)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(fmt_row(label, m))
|
||
if m["sharpe"] > best_sharpe:
|
||
best_sharpe = m["sharpe"]
|
||
best_label = label
|
||
best_rets = rets
|
||
|
||
# --- Best: yearly breakdown ---
|
||
print(f"\n{'=' * 90}")
|
||
print(f"BEST: {best_label} (Sharpe={best_sharpe:.2f})")
|
||
best_m = compute_metrics(best_rets)
|
||
print(f"CAGR: {best_m['cagr']*100:.1f}% Vol: {best_m['vol']*100:.1f}% "
|
||
f"Sharpe: {best_m['sharpe']:.2f} MaxDD: {best_m['max_dd']*100:.1f}% "
|
||
f"Calmar: {best_m['calmar']:.2f}")
|
||
print(f"{'=' * 90}")
|
||
yr = yearly_returns(best_rets)
|
||
for year, ret in yr.items():
|
||
print(f" {year}: {ret*100:>+7.1f}%")
|
||
|
||
# --- IS/OOS ---
|
||
print(f"\n--- IS/OOS Validation ---")
|
||
# Re-run best on IS/OOS splits
|
||
is_rets = best_rets.loc["2016-04-01":"2022-12-31"]
|
||
oos_rets = best_rets.loc["2023-01-01":"2026-05-13"]
|
||
is_m = compute_metrics(is_rets)
|
||
oos_m = compute_metrics(oos_rets)
|
||
print(f" IS (2016-2022): CAGR {is_m['cagr']*100:.1f}% Sharpe {is_m['sharpe']:.2f} MaxDD {is_m['max_dd']*100:.1f}%")
|
||
print(f" OOS (2023-2026): CAGR {oos_m['cagr']*100:.1f}% Sharpe {oos_m['sharpe']:.2f} MaxDD {oos_m['max_dd']*100:.1f}%")
|
||
|
||
# --- Bootstrap ---
|
||
print(f"\n--- Block Bootstrap (5000 samples, block=42d) ---")
|
||
from research.trend_rider_p0 import block_bootstrap
|
||
boot = block_bootstrap(best_rets, n_boot=5000, block_len=42)
|
||
print(f" Sharpe: median={boot['sharpe'].median():.2f} "
|
||
f"5th={boot['sharpe'].quantile(0.05):.2f} "
|
||
f"95th={boot['sharpe'].quantile(0.95):.2f}")
|
||
print(f" MaxDD: median={boot['max_drawdown'].median()*100:.1f}% "
|
||
f"5th={boot['max_drawdown'].quantile(0.05)*100:.1f}% "
|
||
f"95th={boot['max_drawdown'].quantile(0.95)*100:.1f}%")
|
||
print(f" P(Sharpe > 1.5): {(boot['sharpe'] > 1.5).mean()*100:.1f}%")
|
||
print(f" P(Sharpe > 1.0): {(boot['sharpe'] > 1.0).mean()*100:.1f}%")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|