Files
quant/research/alpha_factors.py

359 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Alpha factor library — price-only, academically motivated, with a rolling-IC
combiner, inverse-vol portfolio weighting, and volatility targeting.
Factors (each returns a cross-sectional DataFrame aligned to prices.index):
mom_12_1 12-1 month momentum (Jegadeesh & Titman 1993).
mom_7_1 Intermediate 7-1m momentum (Novy-Marx 2012).
mom_residual Market-residualized 12-1m (Blitz-Huij-Martens 2011).
rev_1m 1-month reversal × -1 (Jegadeesh 1990 / short-term reversal).
w52_high Price / 52-week high, proximity factor (George & Hwang 2004).
max5_neg -avg(top-5 daily returns past 21d) — lottery/MAX (Bali-Cakici-Whitelaw 2011).
idio_vol_neg -residual-vol from 60d market regression (Ang-Hodrick-Xing-Zhang 2006).
low_beta -60d market beta (Betting Against Beta, Frazzini-Pedersen 2014 variant).
trend_strength Slope / RMSE from 63d log-price regression.
recovery_63 Price / 63d low - 1 (project-native, V-rebound proxy).
Combiner:
- Cross-sectional percentile-rank each factor (NaN = keep).
- For each day, blend factors with weights proportional to the rolling
252-day Information Coefficient (Spearman rank corr vs forward 21d return).
- Weights are lagged by 21 days to avoid lookahead; negative-IC factors are
sign-flipped before weighting (so all contribute positively when confident).
Portfolio:
- Rank composite score, pick top_n (default 15) on a rebalance_freq schedule.
- Inverse-vol weight within top_n (60d realized vol).
- Volatility-target the whole portfolio to target_vol (default 18%) using a
trailing 60-day portfolio-vol estimate; exposure clipped to [0.3, 1.5].
- Shift(1) at the end for T-1 signal delivery, matching the project convention.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from strategies.base import Strategy
# ---------------------------------------------------------------------------
# Factor primitives
# ---------------------------------------------------------------------------
def _pct(p, n):
return p.pct_change(n, fill_method=None)
def f_mom_12_1(p):
return p.shift(21).pct_change(231, fill_method=None)
def f_mom_7_1(p):
return p.shift(21).pct_change(126, fill_method=None)
def f_rev_1m(p):
return -p.pct_change(21, fill_method=None)
def f_w52_high(p):
roll_max = p.rolling(252, min_periods=200).max()
return p / roll_max - 1 # ≤0, closer to 0 = near 52w high
def f_max5_neg(p):
ret = p.pct_change(fill_method=None)
# Mean of top-5 returns over the last 21 trading days; negate.
top5 = ret.rolling(21, min_periods=15).apply(
lambda x: np.mean(np.sort(x)[-5:]) if np.isfinite(x).sum() >= 5 else np.nan,
raw=True,
)
return -top5
def f_recovery_63(p):
return p / p.rolling(63, min_periods=60).min() - 1
def f_trend_strength(p):
"""
Vectorized log-price trend strength: rolling OLS slope ÷ residual RMSE on a
63-day window. t-stat-like measure of directional trend quality.
"""
logp = np.log(p.replace(0, np.nan))
n = 63
idx = np.arange(n, dtype=float)
idx_c = idx - idx.mean()
idx_var = (idx_c ** 2).sum()
# E[x·y] over the window: rolling sum of (idx·y) simplified via decomposition:
# Σ (i - ī)(y - ȳ) = Σ i·y - n·ī·ȳ (but ī is constant so just: Σ (i-ī)·y)
# We compute Σ (i-ī)·y as a rolling window-weighted sum.
weights = idx_c # shape (n,)
def rolling_weighted(series_df, w):
"""Σ_{k=0..n-1} w[k] * y[t-(n-1)+k] for each column, vectorized."""
arr = series_df.values
T, K = arr.shape
out = np.full_like(arr, np.nan, dtype=float)
# Convolution across time axis per column:
for k in range(K):
col = arr[:, k]
# Use np.convolve with reversed weights (equivalent to correlate)
conv = np.convolve(col, w[::-1], mode="valid")
out[n - 1:, k] = conv
return pd.DataFrame(out, index=series_df.index, columns=series_df.columns)
# rolling mean and var for log-price
roll_mean = logp.rolling(n, min_periods=n).mean()
# numerator: Σ (i-ī)(y - ȳ) = Σ (i-ī)·y (since Σ(i-ī) = 0)
num = rolling_weighted(logp.fillna(0.0), weights)
slope = num / idx_var
# Residual variance: Σ(y - ȳ)² / n - slope² * idx_var / n
var_y = logp.rolling(n, min_periods=n).var(ddof=0)
resid_var = (var_y - (slope ** 2) * idx_var / n).clip(lower=1e-18)
rmse = np.sqrt(resid_var)
ts = slope / rmse
# mask rows where the window contained any NaN
valid = logp.rolling(n, min_periods=n).count() == n
return ts.where(valid)
def _rolling_beta_and_residvol(p, mkt_ret, window=60):
"""Return (beta, residual_vol) DataFrames aligned to prices.index."""
ret = p.pct_change(fill_method=None)
mkt = mkt_ret.reindex(p.index)
def pair(stock_ret):
cov = stock_ret.rolling(window, min_periods=window).cov(mkt)
var = mkt.rolling(window, min_periods=window).var()
beta = cov / var
# Residual vol via: var(stock) - beta^2 * var(mkt) (simplification)
var_stock = stock_ret.rolling(window, min_periods=window).var()
resid_var = (var_stock - beta ** 2 * var) .clip(lower=0)
resid_vol = np.sqrt(resid_var)
return beta, resid_vol
betas = {}
resid_vols = {}
for col in ret.columns:
b, rv = pair(ret[col])
betas[col] = b
resid_vols[col] = rv
return pd.DataFrame(betas), pd.DataFrame(resid_vols)
def f_mom_residual(p, mkt_ret, betas=None, window=60):
if betas is None:
betas, _ = _rolling_beta_and_residvol(p, mkt_ret, window=window)
# 12-1m cumulative residual return = cum stock ret - beta * cum mkt ret.
# Reindex mkt_ret to p.index so arithmetic below does not produce a union
# index (which would corrupt downstream shape assumptions).
mkt_aligned = mkt_ret.reindex(p.index)
stock_cum = p.shift(21).pct_change(231, fill_method=None)
mkt_cum_ret = (1 + mkt_aligned).rolling(231).apply(lambda x: np.prod(x) - 1, raw=True)
mkt_cum = mkt_cum_ret.shift(21)
out = stock_cum.sub(betas.mul(mkt_cum, axis=0), fill_value=np.nan)
return out.reindex(p.index)
# ---------------------------------------------------------------------------
# Cross-sectional rank helper
# ---------------------------------------------------------------------------
def xsec_rank(df: pd.DataFrame) -> pd.DataFrame:
return df.rank(axis=1, pct=True, na_option="keep")
# ---------------------------------------------------------------------------
# Rolling IC computation
# ---------------------------------------------------------------------------
def rolling_ic(factor_rank: pd.DataFrame, fwd_ret: pd.DataFrame,
window: int = 252) -> pd.Series:
"""Daily Spearman IC = rank(factor) vs rank(fwd_ret); rolling mean."""
fr = fwd_ret.rank(axis=1, pct=True, na_option="keep")
# Per-day pearson corr of rank-transformed ≡ Spearman.
per_day_ic = factor_rank.corrwith(fr, axis=1)
return per_day_ic.rolling(window, min_periods=window // 2).mean()
def _rolling_ls_sharpe(factor_rank: pd.DataFrame,
prices: pd.DataFrame,
window: int = 252,
rebal: int = 21,
tcost: float = 0.001) -> pd.Series:
"""
Rolling realized Sharpe of a long-top-decile / short-bottom-decile portfolio
constructed on `factor_rank`, rebalanced every `rebal` trading days, with
proportional turnover cost `tcost`. Used as a factor-quality weight.
Returned series is aligned to `prices.index` and the Sharpe at day t is
computed from returns over [t-window, t].
"""
long_mask = factor_rank >= 0.9
short_mask = factor_rank <= 0.1
# Rebalance: hold the mask constant between rebal dates
rebal_mask = pd.Series(False, index=factor_rank.index)
rebal_mask.iloc[::rebal] = True
long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0)
short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0)
long_w[~rebal_mask] = np.nan
short_w[~rebal_mask] = np.nan
long_w = long_w.ffill().fillna(0.0)
short_w = short_w.ffill().fillna(0.0)
rets = prices.pct_change(fill_method=None)
long_ret = (long_w.shift(1) * rets).sum(axis=1)
short_ret = (short_w.shift(1) * rets).sum(axis=1)
long_turn = long_w.diff().abs().sum(axis=1).fillna(0.0)
short_turn = short_w.diff().abs().sum(axis=1).fillna(0.0)
ls_ret = (long_ret - short_ret) - (long_turn + short_turn) * tcost
ls_ret = ls_ret.fillna(0.0)
mean = ls_ret.rolling(window, min_periods=window // 2).mean()
std = ls_ret.rolling(window, min_periods=window // 2).std()
sharpe = (mean / std) * np.sqrt(252)
return sharpe
# ---------------------------------------------------------------------------
# Strategy
# ---------------------------------------------------------------------------
class AlphaFactorStrategy(Strategy):
"""
Multi-factor long-only with rolling LS-Sharpe-weighted signal blend,
inverse-vol weighting, and portfolio-level volatility targeting.
Why LS-Sharpe and not IC?
IC (rank-forward correlation) measures directional accuracy but ignores
the magnitude of cross-sectional dispersion. Two factors with identical
IC can have very different P&L. Empirically on this sample rev_1m has
IC t-stat +5 but LS Sharpe -12 — its top decile are freshly crashed
names that keep crashing. We weight by a lagged 252d rolling LS-Sharpe
(top-decile minus bottom-decile, monthly rebalance, 10bps t-cost) and
floor weights at zero so demoted factors simply drop out.
The strategy requires a market return series (e.g. SPY pct_change) passed
at construction time — it is NOT derived from data inside generate_signals,
because the cross-sectional universe contains only selected tickers while
we want a stable market benchmark for beta/residual computations.
"""
def __init__(
self,
mkt_returns: pd.Series,
top_n: int = 15,
rebal_freq: int = 10,
vol_window: int = 60,
vol_target_annual: float | None = 0.18,
ic_window: int = 252,
exposure_clip: tuple[float, float] = (0.30, 1.50),
fwd_window: int = 21,
weight_scheme: str = "ls_sharpe", # {"ls_sharpe", "ic", "equal"}
min_weight: float = 0.0, # floor per-factor weight (0 = drop losers)
):
self.mkt_returns = mkt_returns
self.top_n = top_n
self.rebal_freq = rebal_freq
self.vol_window = vol_window
self.vol_target_annual = vol_target_annual
self.ic_window = ic_window
self.exposure_clip = exposure_clip
self.fwd_window = fwd_window
self.weight_scheme = weight_scheme
self.min_weight = min_weight
# ---- Factor matrix ----
def compute_factors(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]:
betas, resid_vol = _rolling_beta_and_residvol(
data, self.mkt_returns, window=self.vol_window)
factors = {
"mom_12_1": f_mom_12_1(data),
"mom_7_1": f_mom_7_1(data),
"mom_residual": f_mom_residual(data, self.mkt_returns, betas=betas),
"rev_1m": f_rev_1m(data),
"w52_high": f_w52_high(data),
"max5_neg": f_max5_neg(data),
"recovery_63": f_recovery_63(data),
"trend_strength": f_trend_strength(data),
"idio_vol_neg": -resid_vol,
"low_beta": -betas,
}
return factors
# ---- Full pipeline ----
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
factors = self.compute_factors(data)
ranks = {k: xsec_rank(v) for k, v in factors.items()}
if self.weight_scheme == "ic":
fwd_ret = data.shift(-self.fwd_window) / data - 1
weight_series = {
k: rolling_ic(ranks[k], fwd_ret, window=self.ic_window).shift(self.fwd_window)
for k in ranks
}
elif self.weight_scheme == "ls_sharpe":
weight_series = {
k: _rolling_ls_sharpe(ranks[k], data,
window=self.ic_window,
rebal=21, tcost=0.001).shift(self.fwd_window)
for k in ranks
}
elif self.weight_scheme == "equal":
weight_series = {k: pd.Series(1.0, index=ranks[k].index) for k in ranks}
else:
raise ValueError(f"unknown weight_scheme {self.weight_scheme!r}")
composite = None
weight_norm = None
for k, rk in ranks.items():
w = weight_series[k].reindex(rk.index).fillna(0.0)
if self.min_weight is not None:
w = w.where(w > self.min_weight, 0.0)
contrib = rk.mul(w, axis=0)
composite = contrib if composite is None else composite.add(contrib, fill_value=0.0)
abs_w = w.abs()
weight_norm = abs_w if weight_norm is None else weight_norm.add(abs_w, fill_value=0)
weight_norm = weight_norm.replace(0, np.nan)
composite = composite.div(weight_norm, axis=0)
# Top-N selection.
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1)
# Inverse-vol weighting within top_n.
rets = data.pct_change(fill_method=None)
vol = rets.rolling(self.vol_window, min_periods=self.vol_window).std()
inv_vol = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0)
row_sums = inv_vol.sum(axis=1).replace(0, np.nan)
weights = inv_vol.div(row_sums, axis=0).fillna(0.0)
# Rebalance schedule.
warmup = max(252, self.vol_window + 21, self.ic_window + self.fwd_window)
rebal_mask = pd.Series(False, index=data.index)
rebal_idx = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_idx] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
# Volatility targeting at the portfolio level.
if self.vol_target_annual is not None:
# Use returns of the *current* weight vector; vol is trailing realized
# on the applied weights so no lookahead. Compute after ffill.
port_rets = (weights.shift(1) * rets).sum(axis=1)
port_vol = port_rets.rolling(self.vol_window,
min_periods=self.vol_window).std() * np.sqrt(252)
scale = (self.vol_target_annual / port_vol).clip(*self.exposure_clip)
scale = scale.fillna(method="ffill").fillna(1.0)
weights = weights.mul(scale, axis=0)
return weights.shift(1).fillna(0.0)