Files
quant/strategies/ensemble_alpha.py
Gahow Wang 149a00c458 chore: backtest engine fee model, metrics, and strategy fixes
- main.py: add IBKR-style tiered fee schedule (fee_base + fee_per_share),
  PIT universe support, and open-to-close execution improvements
- metrics.py: add raw_summary helper for JSON-safe metric export
- Misc strategy fixes: deprecation warnings, NaN handling

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-21 20:57:56 +08:00

387 lines
15 KiB
Python

"""
Round 3: Signal-level ensemble of the two best strategies.
Key insight from R1/R2:
- FactorCombo rec_mfilt+deep_upvol: CAGR 34.6%, MaxDD -33.9%, Calmar 1.02
- Recovery+Mom Top20: CAGR 34.5%, MaxDD -37.7%, Calmar 0.91
- Inv-vol weighting HURTS recovery signals (they're high-vol by nature)
- More factors = more noise for this alpha source
- Monthly rebalancing is optimal
New approach:
1. Ensemble the two best SIGNALS (not strategies) at the rank level
→ diversifies stock picks while preserving signal strength
2. Equal weighting (proven better for recovery-type signals)
3. Tail-risk protection: only scale down in EXTREME drawdown regimes
(>15% drawdown from peak), not regular vol spikes
4. Test whether a 126-day recovery (deeper) adds signal vs 63-day
"""
import numpy as np
import pandas as pd
from strategies.base import Strategy
def _rank(df):
return df.rank(axis=1, pct=True, na_option="keep")
class EnsembleAlphaStrategy(Strategy):
"""
Ensemble of the two strongest signals with tail-risk protection.
"""
def __init__(
self,
rebal_freq: int = 21,
top_n: int = 20,
tail_protection: bool = True,
tail_threshold: float = -0.15, # drawdown level to trigger protection
tail_scale: float = 0.5, # how much to reduce in tail event
):
self.rebal_freq = rebal_freq
self.top_n = top_n
self.tail_protection = tail_protection
self.tail_threshold = tail_threshold
self.tail_scale = tail_scale
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
p = data
# === Signal A: rec_mfilt + deep_upvol (from FactorCombo) ===
rec_126 = p / p.rolling(126, min_periods=126).min() - 1
mom_filter = p.shift(21).pct_change(105)
rec_mfilt = rec_126.where(mom_filter > 0, np.nan)
rec_mfilt_r = _rank(rec_mfilt)
ret = p.pct_change()
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
deep_upvol = _rank(rec_126) * _rank(up_vol)
deep_upvol_r = _rank(deep_upvol)
signal_a = 0.5 * rec_mfilt_r + 0.5 * deep_upvol_r
# === Signal B: Recovery 63d + 12-1 momentum (from RecoveryMom) ===
rec_63 = p / p.rolling(63, min_periods=63).min() - 1
mom_12_1 = p.shift(21).pct_change(231)
rec_63_r = _rank(rec_63)
mom_r = _rank(mom_12_1)
signal_b = 0.5 * rec_63_r + 0.5 * mom_r
# === Ensemble: average of both signals ===
ensemble = 0.5 * signal_a + 0.5 * signal_b
# === Select top_n ===
rank = ensemble.rank(axis=1, ascending=False, na_option="bottom")
n_valid = ensemble.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
# Equal weight (proven better for recovery signals)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
signals = raw.div(row_sums, axis=0).fillna(0.0)
# === Monthly rebalance ===
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_indices] = True
signals[~rebal_mask] = np.nan
signals = signals.ffill().fillna(0.0)
signals.iloc[:warmup] = 0.0
# === Tail-risk protection (rebal-gated; NaN-safe market mean) ===
# Apply AFTER ffill so the scale changes only at rebal points,
# otherwise daily flips force half-out/half-in trades that burn
# the account through fixed per-trade fees.
if self.tail_protection:
mkt_ret = ret.mean(axis=1, skipna=True)
mkt_eq = (1 + mkt_ret.fillna(0.0)).cumprod()
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
in_tail = mkt_dd < self.tail_threshold
scale_raw = pd.Series(1.0, index=data.index)
scale_raw[in_tail] = self.tail_scale
scale = scale_raw.where(rebal_mask, np.nan).ffill().fillna(1.0)
signals = signals.mul(scale, axis=0)
return signals.shift(1).fillna(0.0)
class EnhancedFactorComboStrategy(Strategy):
"""
FactorCombo signal enhanced with:
1. Additional momentum confirmation (12-1 momentum rank as tiebreaker)
2. Concentration in top conviction names (top_n=15 instead of 20)
3. Optional tail protection
"""
def __init__(
self,
rebal_freq: int = 21,
top_n: int = 15,
mom_boost: float = 0.2, # weight given to additional momentum signal
tail_protection: bool = False,
):
self.rebal_freq = rebal_freq
self.top_n = top_n
self.mom_boost = mom_boost
self.tail_protection = tail_protection
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
p = data
# Core signal: rec_mfilt + deep_upvol
rec_126 = p / p.rolling(126, min_periods=126).min() - 1
mom_filter = p.shift(21).pct_change(105)
rec_mfilt = rec_126.where(mom_filter > 0, np.nan)
rec_mfilt_r = _rank(rec_mfilt)
ret = p.pct_change()
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
deep_upvol = _rank(rec_126) * _rank(up_vol)
deep_upvol_r = _rank(deep_upvol)
base_signal = 0.5 * rec_mfilt_r + 0.5 * deep_upvol_r
# Momentum boost: 12-1 month return rank
mom_12_1 = p.shift(21).pct_change(231)
mom_r = _rank(mom_12_1)
# Combined: base + momentum tiebreaker
signal = (1 - self.mom_boost) * base_signal + self.mom_boost * mom_r
# Select top_n
rank = signal.rank(axis=1, ascending=False, na_option="bottom")
n_valid = signal.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
# Equal weight
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
signals = raw.div(row_sums, axis=0).fillna(0.0)
# Monthly rebalance
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_indices] = True
signals[~rebal_mask] = np.nan
signals = signals.ffill().fillna(0.0)
signals.iloc[:warmup] = 0.0
# Tail protection (rebal-gated to avoid daily-flip turnover)
if self.tail_protection:
mkt_ret = ret.mean(axis=1, skipna=True)
mkt_eq = (1 + mkt_ret.fillna(0.0)).cumprod()
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
in_tail = mkt_dd < -0.15
scale_raw = pd.Series(1.0, index=data.index)
scale_raw[in_tail] = 0.5
scale = scale_raw.where(rebal_mask, np.nan).ffill().fillna(1.0)
signals = signals.mul(scale, axis=0)
return signals.shift(1).fillna(0.0)
class RiskManagedEnsembleStrategy(Strategy):
"""
EnsembleAlpha with market-aware drawdown risk management.
Key insight: Using the strategy's OWN drawdown to scale down creates
a negative feedback loop (cut → miss rebound → deeper DD → cut more).
Instead, use MARKET drawdown as the systemic risk signal:
- Market crash → reduce exposure (systemic risk)
- Strategy underperforms but market is fine → stay invested (alpha issue, not risk)
Mechanisms:
1. Market DD dampener: scales down proportionally to equal-weight market drawdown.
Only fires during systemic stress. Recovers as market recovers.
2. Vol spike guard: when 10-day portfolio vol > 90th percentile of history,
reduce to vol_spike_floor. Catches acute crises.
Both use lagged (T-1) estimates → PIT-safe.
Parameter choices justified by market microstructure (not optimized):
- dd_denom=0.20 → at 20% market crash, exposure reduced to floor
- dd_floor=0.40 → never go below 40% (still participate in recovery)
- vol_spike_floor=0.50 → during vol spikes, halve exposure
"""
def __init__(
self,
top_n: int = 10,
dd_floor: float = 0.40,
dd_denom: float = 0.20,
vol_spike_guard: bool = True,
vol_spike_window: int = 10,
vol_spike_lookback: int = 252,
vol_spike_floor: float = 0.50,
):
self.ensemble = EnsembleAlphaStrategy(top_n=top_n, tail_protection=False)
self.dd_floor = dd_floor
self.dd_denom = dd_denom
self.vol_spike_guard = vol_spike_guard
self.vol_spike_window = vol_spike_window
self.vol_spike_lookback = vol_spike_lookback
self.vol_spike_floor = vol_spike_floor
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
# Step 1: Get raw signals from the ensemble (already shifted by 1)
raw = self.ensemble.generate_signals(data)
# Step 2: Compute MARKET returns over valid (non-masked) columns.
# `daily_rets` keeps NaN for PIT-masked tickers so they don't dilute
# the cross-sectional mean to ~0.
daily_rets = data.pct_change()
mkt_rets = daily_rets.mean(axis=1, skipna=True)
# Step 3: Market drawdown dampener
mkt_eq = (1 + mkt_rets.fillna(0.0)).cumprod()
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
dd_scale_raw = (1.0 + mkt_dd / self.dd_denom).clip(
lower=self.dd_floor, upper=1.0,
)
# Step 4: Vol spike guard from portfolio returns (NaN-aware sum)
if self.vol_spike_guard:
port_rets = (raw * daily_rets).sum(axis=1, min_count=1).fillna(0.0)
short_vol = port_rets.rolling(self.vol_spike_window, min_periods=5).std() * np.sqrt(252)
vol_90th = short_vol.rolling(self.vol_spike_lookback, min_periods=126).quantile(0.90)
in_spike = short_vol > vol_90th
vol_scale_raw = pd.Series(1.0, index=data.index)
vol_scale_raw[in_spike] = self.vol_spike_floor
else:
vol_scale_raw = pd.Series(1.0, index=data.index)
# Step 5: Combined scaling, sampled at the inner ensemble's rebal
# cadence so we don't trade in/out daily (which would incur huge
# fixed-fee costs).
combined = (dd_scale_raw * vol_scale_raw).shift(1).fillna(1.0)
rebal_freq = getattr(self.ensemble, "rebal_freq", 21)
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_indices = list(range(warmup, len(data), rebal_freq))
rebal_mask.iloc[rebal_indices] = True
final_scale = combined.where(rebal_mask, np.nan).ffill().fillna(1.0)
return raw.mul(final_scale, axis=0)
class SharpeBoostedEnsembleStrategy(Strategy):
"""
Optimized ensemble targeting Sharpe >1.5 while maintaining high CAGR.
Key improvements over EnsembleAlphaStrategy:
1. Bimonthly rebalance (42d): recovery signals have 126-day lookback,
monthly rebal causes unnecessary turnover. Let winners run.
2. Slightly wider basket (top_n=12): diversifies idiosyncratic risk
without diluting alpha (sweet spot between 10-15).
3. Asymmetric vol scaling: only de-risk in high-vol NEGATIVE return
regimes (high-vol + positive = good, don't cut).
4. Light market-DD dampener: only fires in severe systemic stress
(dd_denom=0.35 → need 35% market crash to reach floor).
PIT compliance:
- All signal lookbacks use .shift(21) or rolling windows (no current-day data)
- Asymmetric vol uses .shift(1) on scale
- DD dampener uses .shift(1) on mkt_dd
- Final signals use .shift(1) for execution lag
Parameter count: 4 meaningful (rebal_freq, top_n, asym_vol_floor, dd_denom)
All have economic justification, not optimized on in-sample.
"""
def __init__(
self,
top_n: int = 12,
rebal_freq: int = 42,
asym_vol_floor: float = 0.50,
dd_floor: float = 0.70,
dd_denom: float = 0.35,
):
self.top_n = top_n
self.rebal_freq = rebal_freq
self.asym_vol_floor = asym_vol_floor
self.dd_floor = dd_floor
self.dd_denom = dd_denom
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
p = data
ret = p.pct_change()
# === Signal A: rec_mfilt + deep_upvol ===
rec_126 = p / p.rolling(126, min_periods=126).min() - 1
mom_filter = p.shift(21).pct_change(105)
rec_mfilt = rec_126.where(mom_filter > 0, np.nan)
rec_mfilt_r = _rank(rec_mfilt)
up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).sum()
deep_upvol = _rank(rec_126) * _rank(up_vol)
deep_upvol_r = _rank(deep_upvol)
signal_a = 0.5 * rec_mfilt_r + 0.5 * deep_upvol_r
# === Signal B: Recovery 63d + 12-1 momentum ===
rec_63 = p / p.rolling(63, min_periods=63).min() - 1
mom_12_1 = p.shift(21).pct_change(231)
rec_63_r = _rank(rec_63)
mom_r = _rank(mom_12_1)
signal_b = 0.5 * rec_63_r + 0.5 * mom_r
# === Ensemble: equal-weight average of both signals ===
ensemble = 0.5 * signal_a + 0.5 * signal_b
# === Select top_n ===
rank = ensemble.rank(axis=1, ascending=False, na_option="bottom")
n_valid = ensemble.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
signals = raw.div(row_sums, axis=0).fillna(0.0)
# === Bimonthly rebalance (42 trading days) ===
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_indices] = True
signals[~rebal_mask] = np.nan
signals = signals.ffill().fillna(0.0)
signals.iloc[:warmup] = 0.0
signals = signals.shift(1).fillna(0.0) # PIT: 1-day execution lag
# === Asymmetric vol scaling ===
# NB: scales are RE-EVALUATED only on rebalance days. Daily flips of
# asym/dd scales would force half-in/half-out trades each session,
# burning the account through fixed per-trade fees ($2 US / $5 CN).
# Use cross-sectional mean of non-masked returns so PIT-masked
# NaN→0 fills don't dilute the market signal.
daily_rets = data.pct_change()
port_rets = (signals * daily_rets).sum(axis=1, min_count=1).fillna(0.0)
short_vol = port_rets.rolling(20, min_periods=10).std() * np.sqrt(252)
vol_median = short_vol.rolling(252, min_periods=126).median()
recent_ret = port_rets.rolling(20, min_periods=10).sum()
high_vol_neg = (short_vol > vol_median * 1.5) & (recent_ret < 0)
asym_scale_raw = pd.Series(1.0, index=data.index)
asym_scale_raw[high_vol_neg] = self.asym_vol_floor
# === Market-DD dampener (rebal-gated, NaN-aware market mean) ===
mkt_rets = daily_rets.mean(axis=1, skipna=True)
mkt_eq = (1 + mkt_rets.fillna(0.0)).cumprod()
mkt_dd = mkt_eq / mkt_eq.cummax() - 1
dd_scale_raw = (1.0 + mkt_dd / self.dd_denom).clip(
lower=self.dd_floor, upper=1.0
)
# Sample scales at rebal points only, then step-hold between rebals.
combined = (asym_scale_raw * dd_scale_raw).shift(1).fillna(1.0)
rebal_scale = combined.where(rebal_mask, np.nan).ffill().fillna(1.0)
signals = signals.mul(rebal_scale, axis=0)
return signals