359 lines
15 KiB
Python
359 lines
15 KiB
Python
"""
|
||
Alpha factor library — price-only, academically motivated, with a rolling-IC
|
||
combiner, inverse-vol portfolio weighting, and volatility targeting.
|
||
|
||
Factors (each returns a cross-sectional DataFrame aligned to prices.index):
|
||
|
||
mom_12_1 12-1 month momentum (Jegadeesh & Titman 1993).
|
||
mom_7_1 Intermediate 7-1m momentum (Novy-Marx 2012).
|
||
mom_residual Market-residualized 12-1m (Blitz-Huij-Martens 2011).
|
||
rev_1m 1-month reversal × -1 (Jegadeesh 1990 / short-term reversal).
|
||
w52_high Price / 52-week high, proximity factor (George & Hwang 2004).
|
||
max5_neg -avg(top-5 daily returns past 21d) — lottery/MAX (Bali-Cakici-Whitelaw 2011).
|
||
idio_vol_neg -residual-vol from 60d market regression (Ang-Hodrick-Xing-Zhang 2006).
|
||
low_beta -60d market beta (Betting Against Beta, Frazzini-Pedersen 2014 variant).
|
||
trend_strength Slope / RMSE from 63d log-price regression.
|
||
recovery_63 Price / 63d low - 1 (project-native, V-rebound proxy).
|
||
|
||
Combiner:
|
||
- Cross-sectional percentile-rank each factor (NaN = keep).
|
||
- For each day, blend factors with weights proportional to the rolling
|
||
252-day Information Coefficient (Spearman rank corr vs forward 21d return).
|
||
- Weights are lagged by 21 days to avoid lookahead; negative-IC factors are
|
||
sign-flipped before weighting (so all contribute positively when confident).
|
||
|
||
Portfolio:
|
||
- Rank composite score, pick top_n (default 15) on a rebalance_freq schedule.
|
||
- Inverse-vol weight within top_n (60d realized vol).
|
||
- Volatility-target the whole portfolio to target_vol (default 18%) using a
|
||
trailing 60-day portfolio-vol estimate; exposure clipped to [0.3, 1.5].
|
||
- Shift(1) at the end for T-1 signal delivery, matching the project convention.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
from strategies.base import Strategy
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Factor primitives
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _pct(p, n):
|
||
return p.pct_change(n, fill_method=None)
|
||
|
||
|
||
def f_mom_12_1(p):
|
||
return p.shift(21).pct_change(231, fill_method=None)
|
||
|
||
|
||
def f_mom_7_1(p):
|
||
return p.shift(21).pct_change(126, fill_method=None)
|
||
|
||
|
||
def f_rev_1m(p):
|
||
return -p.pct_change(21, fill_method=None)
|
||
|
||
|
||
def f_w52_high(p):
|
||
roll_max = p.rolling(252, min_periods=200).max()
|
||
return p / roll_max - 1 # ≤0, closer to 0 = near 52w high
|
||
|
||
|
||
def f_max5_neg(p):
|
||
ret = p.pct_change(fill_method=None)
|
||
# Mean of top-5 returns over the last 21 trading days; negate.
|
||
top5 = ret.rolling(21, min_periods=15).apply(
|
||
lambda x: np.mean(np.sort(x)[-5:]) if np.isfinite(x).sum() >= 5 else np.nan,
|
||
raw=True,
|
||
)
|
||
return -top5
|
||
|
||
|
||
def f_recovery_63(p):
|
||
return p / p.rolling(63, min_periods=60).min() - 1
|
||
|
||
|
||
def f_trend_strength(p):
|
||
"""
|
||
Vectorized log-price trend strength: rolling OLS slope ÷ residual RMSE on a
|
||
63-day window. t-stat-like measure of directional trend quality.
|
||
"""
|
||
logp = np.log(p.replace(0, np.nan))
|
||
n = 63
|
||
idx = np.arange(n, dtype=float)
|
||
idx_c = idx - idx.mean()
|
||
idx_var = (idx_c ** 2).sum()
|
||
|
||
# E[x·y] over the window: rolling sum of (idx·y) simplified via decomposition:
|
||
# Σ (i - ī)(y - ȳ) = Σ i·y - n·ī·ȳ (but ī is constant so just: Σ (i-ī)·y)
|
||
# We compute Σ (i-ī)·y as a rolling window-weighted sum.
|
||
weights = idx_c # shape (n,)
|
||
|
||
def rolling_weighted(series_df, w):
|
||
"""Σ_{k=0..n-1} w[k] * y[t-(n-1)+k] for each column, vectorized."""
|
||
arr = series_df.values
|
||
T, K = arr.shape
|
||
out = np.full_like(arr, np.nan, dtype=float)
|
||
# Convolution across time axis per column:
|
||
for k in range(K):
|
||
col = arr[:, k]
|
||
# Use np.convolve with reversed weights (equivalent to correlate)
|
||
conv = np.convolve(col, w[::-1], mode="valid")
|
||
out[n - 1:, k] = conv
|
||
return pd.DataFrame(out, index=series_df.index, columns=series_df.columns)
|
||
|
||
# rolling mean and var for log-price
|
||
roll_mean = logp.rolling(n, min_periods=n).mean()
|
||
# numerator: Σ (i-ī)(y - ȳ) = Σ (i-ī)·y (since Σ(i-ī) = 0)
|
||
num = rolling_weighted(logp.fillna(0.0), weights)
|
||
slope = num / idx_var
|
||
# Residual variance: Σ(y - ȳ)² / n - slope² * idx_var / n
|
||
var_y = logp.rolling(n, min_periods=n).var(ddof=0)
|
||
resid_var = (var_y - (slope ** 2) * idx_var / n).clip(lower=1e-18)
|
||
rmse = np.sqrt(resid_var)
|
||
ts = slope / rmse
|
||
# mask rows where the window contained any NaN
|
||
valid = logp.rolling(n, min_periods=n).count() == n
|
||
return ts.where(valid)
|
||
|
||
|
||
def _rolling_beta_and_residvol(p, mkt_ret, window=60):
|
||
"""Return (beta, residual_vol) DataFrames aligned to prices.index."""
|
||
ret = p.pct_change(fill_method=None)
|
||
mkt = mkt_ret.reindex(p.index)
|
||
|
||
def pair(stock_ret):
|
||
cov = stock_ret.rolling(window, min_periods=window).cov(mkt)
|
||
var = mkt.rolling(window, min_periods=window).var()
|
||
beta = cov / var
|
||
# Residual vol via: var(stock) - beta^2 * var(mkt) (simplification)
|
||
var_stock = stock_ret.rolling(window, min_periods=window).var()
|
||
resid_var = (var_stock - beta ** 2 * var) .clip(lower=0)
|
||
resid_vol = np.sqrt(resid_var)
|
||
return beta, resid_vol
|
||
|
||
betas = {}
|
||
resid_vols = {}
|
||
for col in ret.columns:
|
||
b, rv = pair(ret[col])
|
||
betas[col] = b
|
||
resid_vols[col] = rv
|
||
return pd.DataFrame(betas), pd.DataFrame(resid_vols)
|
||
|
||
|
||
def f_mom_residual(p, mkt_ret, betas=None, window=60):
|
||
if betas is None:
|
||
betas, _ = _rolling_beta_and_residvol(p, mkt_ret, window=window)
|
||
# 12-1m cumulative residual return = cum stock ret - beta * cum mkt ret.
|
||
# Reindex mkt_ret to p.index so arithmetic below does not produce a union
|
||
# index (which would corrupt downstream shape assumptions).
|
||
mkt_aligned = mkt_ret.reindex(p.index)
|
||
stock_cum = p.shift(21).pct_change(231, fill_method=None)
|
||
mkt_cum_ret = (1 + mkt_aligned).rolling(231).apply(lambda x: np.prod(x) - 1, raw=True)
|
||
mkt_cum = mkt_cum_ret.shift(21)
|
||
out = stock_cum.sub(betas.mul(mkt_cum, axis=0), fill_value=np.nan)
|
||
return out.reindex(p.index)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cross-sectional rank helper
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def xsec_rank(df: pd.DataFrame) -> pd.DataFrame:
|
||
return df.rank(axis=1, pct=True, na_option="keep")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rolling IC computation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def rolling_ic(factor_rank: pd.DataFrame, fwd_ret: pd.DataFrame,
|
||
window: int = 252) -> pd.Series:
|
||
"""Daily Spearman IC = rank(factor) vs rank(fwd_ret); rolling mean."""
|
||
fr = fwd_ret.rank(axis=1, pct=True, na_option="keep")
|
||
# Per-day pearson corr of rank-transformed ≡ Spearman.
|
||
per_day_ic = factor_rank.corrwith(fr, axis=1)
|
||
return per_day_ic.rolling(window, min_periods=window // 2).mean()
|
||
|
||
|
||
def _rolling_ls_sharpe(factor_rank: pd.DataFrame,
|
||
prices: pd.DataFrame,
|
||
window: int = 252,
|
||
rebal: int = 21,
|
||
tcost: float = 0.001) -> pd.Series:
|
||
"""
|
||
Rolling realized Sharpe of a long-top-decile / short-bottom-decile portfolio
|
||
constructed on `factor_rank`, rebalanced every `rebal` trading days, with
|
||
proportional turnover cost `tcost`. Used as a factor-quality weight.
|
||
|
||
Returned series is aligned to `prices.index` and the Sharpe at day t is
|
||
computed from returns over [t-window, t].
|
||
"""
|
||
long_mask = factor_rank >= 0.9
|
||
short_mask = factor_rank <= 0.1
|
||
# Rebalance: hold the mask constant between rebal dates
|
||
rebal_mask = pd.Series(False, index=factor_rank.index)
|
||
rebal_mask.iloc[::rebal] = True
|
||
long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0)
|
||
short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0)
|
||
long_w[~rebal_mask] = np.nan
|
||
short_w[~rebal_mask] = np.nan
|
||
long_w = long_w.ffill().fillna(0.0)
|
||
short_w = short_w.ffill().fillna(0.0)
|
||
|
||
rets = prices.pct_change(fill_method=None)
|
||
long_ret = (long_w.shift(1) * rets).sum(axis=1)
|
||
short_ret = (short_w.shift(1) * rets).sum(axis=1)
|
||
|
||
long_turn = long_w.diff().abs().sum(axis=1).fillna(0.0)
|
||
short_turn = short_w.diff().abs().sum(axis=1).fillna(0.0)
|
||
|
||
ls_ret = (long_ret - short_ret) - (long_turn + short_turn) * tcost
|
||
ls_ret = ls_ret.fillna(0.0)
|
||
mean = ls_ret.rolling(window, min_periods=window // 2).mean()
|
||
std = ls_ret.rolling(window, min_periods=window // 2).std()
|
||
sharpe = (mean / std) * np.sqrt(252)
|
||
return sharpe
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Strategy
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class AlphaFactorStrategy(Strategy):
|
||
"""
|
||
Multi-factor long-only with rolling LS-Sharpe-weighted signal blend,
|
||
inverse-vol weighting, and portfolio-level volatility targeting.
|
||
|
||
Why LS-Sharpe and not IC?
|
||
IC (rank-forward correlation) measures directional accuracy but ignores
|
||
the magnitude of cross-sectional dispersion. Two factors with identical
|
||
IC can have very different P&L. Empirically on this sample rev_1m has
|
||
IC t-stat +5 but LS Sharpe -12 — its top decile are freshly crashed
|
||
names that keep crashing. We weight by a lagged 252d rolling LS-Sharpe
|
||
(top-decile minus bottom-decile, monthly rebalance, 10bps t-cost) and
|
||
floor weights at zero so demoted factors simply drop out.
|
||
|
||
The strategy requires a market return series (e.g. SPY pct_change) passed
|
||
at construction time — it is NOT derived from data inside generate_signals,
|
||
because the cross-sectional universe contains only selected tickers while
|
||
we want a stable market benchmark for beta/residual computations.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
mkt_returns: pd.Series,
|
||
top_n: int = 15,
|
||
rebal_freq: int = 10,
|
||
vol_window: int = 60,
|
||
vol_target_annual: float | None = 0.18,
|
||
ic_window: int = 252,
|
||
exposure_clip: tuple[float, float] = (0.30, 1.50),
|
||
fwd_window: int = 21,
|
||
weight_scheme: str = "ls_sharpe", # {"ls_sharpe", "ic", "equal"}
|
||
min_weight: float = 0.0, # floor per-factor weight (0 = drop losers)
|
||
):
|
||
self.mkt_returns = mkt_returns
|
||
self.top_n = top_n
|
||
self.rebal_freq = rebal_freq
|
||
self.vol_window = vol_window
|
||
self.vol_target_annual = vol_target_annual
|
||
self.ic_window = ic_window
|
||
self.exposure_clip = exposure_clip
|
||
self.fwd_window = fwd_window
|
||
self.weight_scheme = weight_scheme
|
||
self.min_weight = min_weight
|
||
|
||
# ---- Factor matrix ----
|
||
def compute_factors(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]:
|
||
betas, resid_vol = _rolling_beta_and_residvol(
|
||
data, self.mkt_returns, window=self.vol_window)
|
||
factors = {
|
||
"mom_12_1": f_mom_12_1(data),
|
||
"mom_7_1": f_mom_7_1(data),
|
||
"mom_residual": f_mom_residual(data, self.mkt_returns, betas=betas),
|
||
"rev_1m": f_rev_1m(data),
|
||
"w52_high": f_w52_high(data),
|
||
"max5_neg": f_max5_neg(data),
|
||
"recovery_63": f_recovery_63(data),
|
||
"trend_strength": f_trend_strength(data),
|
||
"idio_vol_neg": -resid_vol,
|
||
"low_beta": -betas,
|
||
}
|
||
return factors
|
||
|
||
# ---- Full pipeline ----
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
factors = self.compute_factors(data)
|
||
ranks = {k: xsec_rank(v) for k, v in factors.items()}
|
||
|
||
if self.weight_scheme == "ic":
|
||
fwd_ret = data.shift(-self.fwd_window) / data - 1
|
||
weight_series = {
|
||
k: rolling_ic(ranks[k], fwd_ret, window=self.ic_window).shift(self.fwd_window)
|
||
for k in ranks
|
||
}
|
||
elif self.weight_scheme == "ls_sharpe":
|
||
weight_series = {
|
||
k: _rolling_ls_sharpe(ranks[k], data,
|
||
window=self.ic_window,
|
||
rebal=21, tcost=0.001).shift(self.fwd_window)
|
||
for k in ranks
|
||
}
|
||
elif self.weight_scheme == "equal":
|
||
weight_series = {k: pd.Series(1.0, index=ranks[k].index) for k in ranks}
|
||
else:
|
||
raise ValueError(f"unknown weight_scheme {self.weight_scheme!r}")
|
||
|
||
composite = None
|
||
weight_norm = None
|
||
for k, rk in ranks.items():
|
||
w = weight_series[k].reindex(rk.index).fillna(0.0)
|
||
if self.min_weight is not None:
|
||
w = w.where(w > self.min_weight, 0.0)
|
||
contrib = rk.mul(w, axis=0)
|
||
composite = contrib if composite is None else composite.add(contrib, fill_value=0.0)
|
||
abs_w = w.abs()
|
||
weight_norm = abs_w if weight_norm is None else weight_norm.add(abs_w, fill_value=0)
|
||
weight_norm = weight_norm.replace(0, np.nan)
|
||
composite = composite.div(weight_norm, axis=0)
|
||
|
||
# Top-N selection.
|
||
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
|
||
n_valid = composite.notna().sum(axis=1)
|
||
enough = n_valid >= self.top_n
|
||
top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1)
|
||
|
||
# Inverse-vol weighting within top_n.
|
||
rets = data.pct_change(fill_method=None)
|
||
vol = rets.rolling(self.vol_window, min_periods=self.vol_window).std()
|
||
inv_vol = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0)
|
||
row_sums = inv_vol.sum(axis=1).replace(0, np.nan)
|
||
weights = inv_vol.div(row_sums, axis=0).fillna(0.0)
|
||
|
||
# Rebalance schedule.
|
||
warmup = max(252, self.vol_window + 21, self.ic_window + self.fwd_window)
|
||
rebal_mask = pd.Series(False, index=data.index)
|
||
rebal_idx = list(range(warmup, len(data), self.rebal_freq))
|
||
rebal_mask.iloc[rebal_idx] = True
|
||
weights[~rebal_mask] = np.nan
|
||
weights = weights.ffill().fillna(0.0)
|
||
weights.iloc[:warmup] = 0.0
|
||
|
||
# Volatility targeting at the portfolio level.
|
||
if self.vol_target_annual is not None:
|
||
# Use returns of the *current* weight vector; vol is trailing realized
|
||
# on the applied weights so no lookahead. Compute after ffill.
|
||
port_rets = (weights.shift(1) * rets).sum(axis=1)
|
||
port_vol = port_rets.rolling(self.vol_window,
|
||
min_periods=self.vol_window).std() * np.sqrt(252)
|
||
scale = (self.vol_target_annual / port_vol).clip(*self.exposure_clip)
|
||
scale = scale.fillna(method="ffill").fillna(1.0)
|
||
weights = weights.mul(scale, axis=0)
|
||
|
||
return weights.shift(1).fillna(0.0)
|