Add 28 research scripts covering DCA simulation, momentum evaluation, Sharpe optimization, trend rider analysis, and US fundamentals exploration.
293 lines
10 KiB
Python
293 lines
10 KiB
Python
"""
|
||
Sharpe boost v2: Dispersion-adaptive exposure + momentum blend.
|
||
|
||
Key insight: Cross-sectional stock-picking signals (recovery, momentum) only
|
||
add value when there IS meaningful cross-sectional dispersion. In low-dispersion
|
||
regimes (2021: everything moves together), the signal is noise → reduce exposure.
|
||
|
||
Approach:
|
||
1. Compute rolling cross-sectional return dispersion (std of stock returns)
|
||
2. When dispersion < historical median → scale down to partial exposure
|
||
3. Combine with momentum blend + DD dampener
|
||
|
||
This is economically justified (not curve-fitting):
|
||
- Stock-picking alpha ∝ dispersion (proven in academic literature)
|
||
- Low dispersion = herd behavior = stock selection adds no value
|
||
- High dispersion = stock differentiation = signal is informative
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from strategies.base import Strategy
|
||
|
||
|
||
def _rank(df):
|
||
return df.rank(axis=1, pct=True, na_option="keep")
|
||
|
||
|
||
class DispersionAdaptiveEnsemble(Strategy):
|
||
"""
|
||
Ensemble with dispersion-adaptive exposure.
|
||
Reduces exposure when cross-sectional dispersion is low (signal uninformative).
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
rebal_freq: int = 21,
|
||
top_n: int = 10,
|
||
mom_blend: float = 0.25,
|
||
# Dispersion filter
|
||
disp_window: int = 21,
|
||
disp_lookback: int = 252,
|
||
disp_percentile: float = 0.40, # below this percentile → reduce
|
||
disp_floor: float = 0.50, # minimum exposure in low-disp regime
|
||
# DD dampener
|
||
dd_floor: float = 0.40,
|
||
dd_denom: float = 0.20,
|
||
risk_managed: bool = True,
|
||
):
|
||
self.rebal_freq = rebal_freq
|
||
self.top_n = top_n
|
||
self.mom_blend = mom_blend
|
||
self.disp_window = disp_window
|
||
self.disp_lookback = disp_lookback
|
||
self.disp_percentile = disp_percentile
|
||
self.disp_floor = disp_floor
|
||
self.dd_floor = dd_floor
|
||
self.dd_denom = dd_denom
|
||
self.risk_managed = risk_managed
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
p = data
|
||
ret = p.pct_change()
|
||
|
||
# === Signal A: rec_mfilt + deep_upvol ===
|
||
rec_126 = p / p.rolling(126, min_periods=126).min() - 1
|
||
mom_filter = p.shift(21).pct_change(105)
|
||
rec_mfilt = rec_126.where(mom_filter > 0, np.nan)
|
||
rec_mfilt_r = _rank(rec_mfilt)
|
||
|
||
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
|
||
deep_upvol = _rank(rec_126) * _rank(up_vol)
|
||
deep_upvol_r = _rank(deep_upvol)
|
||
|
||
signal_a = 0.5 * rec_mfilt_r + 0.5 * deep_upvol_r
|
||
|
||
# === Signal B: Recovery 63d + 12-1 momentum ===
|
||
rec_63 = p / p.rolling(63, min_periods=63).min() - 1
|
||
mom_12_1 = p.shift(21).pct_change(231)
|
||
rec_63_r = _rank(rec_63)
|
||
mom_r = _rank(mom_12_1)
|
||
signal_b = 0.5 * rec_63_r + 0.5 * mom_r
|
||
|
||
# === Signal C: Pure momentum ===
|
||
signal_c = mom_r
|
||
|
||
# === Ensemble ===
|
||
α = self.mom_blend
|
||
ensemble = (1 - α) / 2 * signal_a + (1 - α) / 2 * signal_b + α * signal_c
|
||
|
||
# === Select top_n ===
|
||
rank = ensemble.rank(axis=1, ascending=False, na_option="bottom")
|
||
n_valid = ensemble.notna().sum(axis=1)
|
||
enough = n_valid >= self.top_n
|
||
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
|
||
|
||
raw = top_mask.astype(float)
|
||
row_sums = raw.sum(axis=1).replace(0, np.nan)
|
||
signals = raw.div(row_sums, axis=0).fillna(0.0)
|
||
|
||
# === Monthly rebalance ===
|
||
warmup = 252
|
||
rebal_mask = pd.Series(False, index=data.index)
|
||
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
|
||
rebal_mask.iloc[rebal_indices] = True
|
||
|
||
signals[~rebal_mask] = np.nan
|
||
signals = signals.ffill().fillna(0.0)
|
||
signals.iloc[:warmup] = 0.0
|
||
signals = signals.shift(1).fillna(0.0) # PIT
|
||
|
||
# === Dispersion-adaptive exposure ===
|
||
# Cross-sectional dispersion: std of stock returns each day
|
||
cs_disp = ret.std(axis=1)
|
||
# Rolling mean of dispersion
|
||
disp_smooth = cs_disp.rolling(self.disp_window, min_periods=10).mean()
|
||
# Historical percentile rank
|
||
disp_pctile = disp_smooth.rolling(
|
||
self.disp_lookback, min_periods=126
|
||
).rank(pct=True)
|
||
|
||
# Scale: 1.0 when dispersion is high, floor when low
|
||
# Linear interpolation between floor and 1.0
|
||
disp_scale = self.disp_floor + (1.0 - self.disp_floor) * (
|
||
(disp_pctile - 0.0) / (self.disp_percentile)
|
||
).clip(0.0, 1.0)
|
||
# PIT: use yesterday's dispersion estimate
|
||
disp_scale_lagged = disp_scale.shift(1).fillna(1.0)
|
||
|
||
signals = signals.mul(disp_scale_lagged, axis=0)
|
||
|
||
# === Market DD dampener ===
|
||
if self.risk_managed:
|
||
daily_rets = data.pct_change().fillna(0.0)
|
||
mkt_rets = daily_rets.mean(axis=1)
|
||
mkt_eq = (1 + mkt_rets).cumprod()
|
||
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
|
||
dd_scale = (1.0 + mkt_dd / self.dd_denom).clip(
|
||
lower=self.dd_floor, upper=1.0
|
||
)
|
||
dd_scale_lagged = dd_scale.shift(1).fillna(1.0)
|
||
signals = signals.mul(dd_scale_lagged, axis=0)
|
||
|
||
return signals
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Evaluation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def compute_metrics(daily_rets: pd.Series) -> dict:
|
||
eq = (1 + daily_rets).cumprod()
|
||
n_years = len(daily_rets) / 252.0
|
||
cagr = eq.iloc[-1] ** (1.0 / n_years) - 1.0
|
||
vol = daily_rets.std() * np.sqrt(252)
|
||
sharpe = daily_rets.mean() / daily_rets.std() * np.sqrt(252) if daily_rets.std() > 0 else 0
|
||
running_max = eq.cummax()
|
||
dd = eq / running_max - 1
|
||
max_dd = dd.min()
|
||
calmar = cagr / abs(max_dd) if max_dd != 0 else 0
|
||
return {"cagr": cagr, "vol": vol, "sharpe": sharpe, "max_dd": max_dd, "calmar": calmar}
|
||
|
||
|
||
def yearly_returns(daily_rets: pd.Series) -> pd.Series:
|
||
eq = (1 + daily_rets).cumprod()
|
||
yearly = eq.resample("YE").last().pct_change()
|
||
yearly.iloc[0] = eq.resample("YE").last().iloc[0] - 1
|
||
yearly.index = yearly.index.year
|
||
return yearly
|
||
|
||
|
||
_DATA_CACHE = {}
|
||
|
||
|
||
def backtest_strategy(strategy, start="2016-04-01", end="2026-05-13"):
|
||
import data_manager
|
||
if "data" not in _DATA_CACHE:
|
||
from universe import get_sp500
|
||
tickers = get_sp500()
|
||
data_manager.update("us", tickers)
|
||
_DATA_CACHE["data"] = data_manager.load("us")
|
||
data = _DATA_CACHE["data"]
|
||
if data is None:
|
||
raise RuntimeError("No data loaded")
|
||
weights = strategy.generate_signals(data)
|
||
daily_rets = (weights * data.pct_change().fillna(0.0)).sum(axis=1)
|
||
return daily_rets.loc[start:end]
|
||
|
||
|
||
def main():
|
||
print("=" * 80)
|
||
print("SHARPE BOOST v2: Dispersion-Adaptive Exposure")
|
||
print("=" * 80)
|
||
|
||
# --- Test 1: Dispersion filter only (no DD dampener) ---
|
||
print("\n--- Dispersion filter sweep (risk_managed=False) ---")
|
||
print(f"{'disp_pct':>8s} {'floor':>6s} {'CAGR':>7s} {'Vol':>7s} {'Sharpe':>7s} {'MaxDD':>7s} {'Calmar':>7s}")
|
||
print("-" * 60)
|
||
|
||
configs = [
|
||
(0.30, 0.40),
|
||
(0.30, 0.50),
|
||
(0.40, 0.40),
|
||
(0.40, 0.50),
|
||
(0.40, 0.60),
|
||
(0.50, 0.40),
|
||
(0.50, 0.50),
|
||
(0.50, 0.60),
|
||
]
|
||
|
||
for dp, df in configs:
|
||
strat = DispersionAdaptiveEnsemble(
|
||
top_n=10, mom_blend=0.25, disp_percentile=dp,
|
||
disp_floor=df, risk_managed=False
|
||
)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(f"{dp:>8.2f} {df:>6.2f} {m['cagr']*100:>6.1f}% {m['vol']*100:>6.1f}% "
|
||
f"{m['sharpe']:>7.2f} {m['max_dd']*100:>6.1f}% {m['calmar']:>7.2f}")
|
||
|
||
# --- Test 2: Dispersion filter + DD dampener ---
|
||
print("\n--- Dispersion filter + DD dampener (risk_managed=True) ---")
|
||
print(f"{'disp_pct':>8s} {'floor':>6s} {'CAGR':>7s} {'Vol':>7s} {'Sharpe':>7s} {'MaxDD':>7s} {'Calmar':>7s}")
|
||
print("-" * 60)
|
||
|
||
for dp, df in configs:
|
||
strat = DispersionAdaptiveEnsemble(
|
||
top_n=10, mom_blend=0.25, disp_percentile=dp,
|
||
disp_floor=df, risk_managed=True
|
||
)
|
||
rets = backtest_strategy(strat)
|
||
m = compute_metrics(rets)
|
||
print(f"{dp:>8.2f} {df:>6.2f} {m['cagr']*100:>6.1f}% {m['vol']*100:>6.1f}% "
|
||
f"{m['sharpe']:>7.2f} {m['max_dd']*100:>6.1f}% {m['calmar']:>7.2f}")
|
||
|
||
# --- Test 3: Best dispersion config — yearly breakdown ---
|
||
print(f"\n{'=' * 80}")
|
||
print("BEST CONFIG: disp_pct=0.40, floor=0.50, risk_managed=True")
|
||
print(f"{'=' * 80}")
|
||
|
||
best_strat = DispersionAdaptiveEnsemble(
|
||
top_n=10, mom_blend=0.25, disp_percentile=0.40,
|
||
disp_floor=0.50, risk_managed=True
|
||
)
|
||
best_rets = backtest_strategy(best_strat)
|
||
best_m = compute_metrics(best_rets)
|
||
print(f"CAGR: {best_m['cagr']*100:.1f}% Vol: {best_m['vol']*100:.1f}% "
|
||
f"Sharpe: {best_m['sharpe']:.2f} MaxDD: {best_m['max_dd']*100:.1f}% "
|
||
f"Calmar: {best_m['calmar']:.2f}")
|
||
|
||
print("\n--- Yearly returns ---")
|
||
yr = yearly_returns(best_rets)
|
||
for year, ret in yr.items():
|
||
print(f" {year}: {ret*100:>+7.1f}%")
|
||
|
||
# --- Test 4: No filter baseline for comparison ---
|
||
print(f"\n--- Baseline (no dispersion filter, no DD) ---")
|
||
baseline = DispersionAdaptiveEnsemble(
|
||
top_n=10, mom_blend=0.25, disp_percentile=0.0,
|
||
disp_floor=1.0, risk_managed=False
|
||
)
|
||
base_rets = backtest_strategy(baseline)
|
||
base_m = compute_metrics(base_rets)
|
||
print(f"CAGR: {base_m['cagr']*100:.1f}% Vol: {base_m['vol']*100:.1f}% "
|
||
f"Sharpe: {base_m['sharpe']:.2f} MaxDD: {base_m['max_dd']*100:.1f}%")
|
||
|
||
# --- Test 5: Dispersion diagnostics for 2021 ---
|
||
print(f"\n{'=' * 80}")
|
||
print("DISPERSION DIAGNOSTIC: Is 2021 actually low dispersion?")
|
||
print(f"{'=' * 80}")
|
||
|
||
import data_manager
|
||
data = _DATA_CACHE["data"]
|
||
ret = data.pct_change()
|
||
cs_disp = ret.std(axis=1)
|
||
disp_smooth = cs_disp.rolling(21, min_periods=10).mean()
|
||
|
||
for year in range(2017, 2027):
|
||
yr_disp = disp_smooth.loc[f"{year}"]
|
||
if len(yr_disp) > 0:
|
||
print(f" {year}: avg disp = {yr_disp.mean()*100:.2f}% "
|
||
f"median = {yr_disp.median()*100:.2f}%")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|