""" Alpha factor library — price-only, academically motivated, with a rolling-IC combiner, inverse-vol portfolio weighting, and volatility targeting. Factors (each returns a cross-sectional DataFrame aligned to prices.index): mom_12_1 12-1 month momentum (Jegadeesh & Titman 1993). mom_7_1 Intermediate 7-1m momentum (Novy-Marx 2012). mom_residual Market-residualized 12-1m (Blitz-Huij-Martens 2011). rev_1m 1-month reversal × -1 (Jegadeesh 1990 / short-term reversal). w52_high Price / 52-week high, proximity factor (George & Hwang 2004). max5_neg -avg(top-5 daily returns past 21d) — lottery/MAX (Bali-Cakici-Whitelaw 2011). idio_vol_neg -residual-vol from 60d market regression (Ang-Hodrick-Xing-Zhang 2006). low_beta -60d market beta (Betting Against Beta, Frazzini-Pedersen 2014 variant). trend_strength Slope / RMSE from 63d log-price regression. recovery_63 Price / 63d low - 1 (project-native, V-rebound proxy). Combiner: - Cross-sectional percentile-rank each factor (NaN = keep). - For each day, blend factors with weights proportional to the rolling 252-day Information Coefficient (Spearman rank corr vs forward 21d return). - Weights are lagged by 21 days to avoid lookahead; negative-IC factors are sign-flipped before weighting (so all contribute positively when confident). Portfolio: - Rank composite score, pick top_n (default 15) on a rebalance_freq schedule. - Inverse-vol weight within top_n (60d realized vol). - Volatility-target the whole portfolio to target_vol (default 18%) using a trailing 60-day portfolio-vol estimate; exposure clipped to [0.3, 1.5]. - Shift(1) at the end for T-1 signal delivery, matching the project convention. """ from __future__ import annotations import numpy as np import pandas as pd from strategies.base import Strategy # --------------------------------------------------------------------------- # Factor primitives # --------------------------------------------------------------------------- def _pct(p, n): return p.pct_change(n, fill_method=None) def f_mom_12_1(p): return p.shift(21).pct_change(231, fill_method=None) def f_mom_7_1(p): return p.shift(21).pct_change(126, fill_method=None) def f_rev_1m(p): return -p.pct_change(21, fill_method=None) def f_w52_high(p): roll_max = p.rolling(252, min_periods=200).max() return p / roll_max - 1 # ≤0, closer to 0 = near 52w high def f_max5_neg(p): ret = p.pct_change(fill_method=None) # Mean of top-5 returns over the last 21 trading days; negate. top5 = ret.rolling(21, min_periods=15).apply( lambda x: np.mean(np.sort(x)[-5:]) if np.isfinite(x).sum() >= 5 else np.nan, raw=True, ) return -top5 def f_recovery_63(p): return p / p.rolling(63, min_periods=60).min() - 1 def f_trend_strength(p): """ Vectorized log-price trend strength: rolling OLS slope ÷ residual RMSE on a 63-day window. t-stat-like measure of directional trend quality. """ logp = np.log(p.replace(0, np.nan)) n = 63 idx = np.arange(n, dtype=float) idx_c = idx - idx.mean() idx_var = (idx_c ** 2).sum() # E[x·y] over the window: rolling sum of (idx·y) simplified via decomposition: # Σ (i - ī)(y - ȳ) = Σ i·y - n·ī·ȳ (but ī is constant so just: Σ (i-ī)·y) # We compute Σ (i-ī)·y as a rolling window-weighted sum. weights = idx_c # shape (n,) def rolling_weighted(series_df, w): """Σ_{k=0..n-1} w[k] * y[t-(n-1)+k] for each column, vectorized.""" arr = series_df.values T, K = arr.shape out = np.full_like(arr, np.nan, dtype=float) # Convolution across time axis per column: for k in range(K): col = arr[:, k] # Use np.convolve with reversed weights (equivalent to correlate) conv = np.convolve(col, w[::-1], mode="valid") out[n - 1:, k] = conv return pd.DataFrame(out, index=series_df.index, columns=series_df.columns) # rolling mean and var for log-price roll_mean = logp.rolling(n, min_periods=n).mean() # numerator: Σ (i-ī)(y - ȳ) = Σ (i-ī)·y (since Σ(i-ī) = 0) num = rolling_weighted(logp.fillna(0.0), weights) slope = num / idx_var # Residual variance: Σ(y - ȳ)² / n - slope² * idx_var / n var_y = logp.rolling(n, min_periods=n).var(ddof=0) resid_var = (var_y - (slope ** 2) * idx_var / n).clip(lower=1e-18) rmse = np.sqrt(resid_var) ts = slope / rmse # mask rows where the window contained any NaN valid = logp.rolling(n, min_periods=n).count() == n return ts.where(valid) def _rolling_beta_and_residvol(p, mkt_ret, window=60): """Return (beta, residual_vol) DataFrames aligned to prices.index.""" ret = p.pct_change(fill_method=None) mkt = mkt_ret.reindex(p.index) def pair(stock_ret): cov = stock_ret.rolling(window, min_periods=window).cov(mkt) var = mkt.rolling(window, min_periods=window).var() beta = cov / var # Residual vol via: var(stock) - beta^2 * var(mkt) (simplification) var_stock = stock_ret.rolling(window, min_periods=window).var() resid_var = (var_stock - beta ** 2 * var) .clip(lower=0) resid_vol = np.sqrt(resid_var) return beta, resid_vol betas = {} resid_vols = {} for col in ret.columns: b, rv = pair(ret[col]) betas[col] = b resid_vols[col] = rv return pd.DataFrame(betas), pd.DataFrame(resid_vols) def f_mom_residual(p, mkt_ret, betas=None, window=60): if betas is None: betas, _ = _rolling_beta_and_residvol(p, mkt_ret, window=window) # 12-1m cumulative residual return = cum stock ret - beta * cum mkt ret. # Reindex mkt_ret to p.index so arithmetic below does not produce a union # index (which would corrupt downstream shape assumptions). mkt_aligned = mkt_ret.reindex(p.index) stock_cum = p.shift(21).pct_change(231, fill_method=None) mkt_cum_ret = (1 + mkt_aligned).rolling(231).apply(lambda x: np.prod(x) - 1, raw=True) mkt_cum = mkt_cum_ret.shift(21) out = stock_cum.sub(betas.mul(mkt_cum, axis=0), fill_value=np.nan) return out.reindex(p.index) # --------------------------------------------------------------------------- # Cross-sectional rank helper # --------------------------------------------------------------------------- def xsec_rank(df: pd.DataFrame) -> pd.DataFrame: return df.rank(axis=1, pct=True, na_option="keep") # --------------------------------------------------------------------------- # Rolling IC computation # --------------------------------------------------------------------------- def rolling_ic(factor_rank: pd.DataFrame, fwd_ret: pd.DataFrame, window: int = 252) -> pd.Series: """Daily Spearman IC = rank(factor) vs rank(fwd_ret); rolling mean.""" fr = fwd_ret.rank(axis=1, pct=True, na_option="keep") # Per-day pearson corr of rank-transformed ≡ Spearman. per_day_ic = factor_rank.corrwith(fr, axis=1) return per_day_ic.rolling(window, min_periods=window // 2).mean() def _rolling_ls_sharpe(factor_rank: pd.DataFrame, prices: pd.DataFrame, window: int = 252, rebal: int = 21, tcost: float = 0.001) -> pd.Series: """ Rolling realized Sharpe of a long-top-decile / short-bottom-decile portfolio constructed on `factor_rank`, rebalanced every `rebal` trading days, with proportional turnover cost `tcost`. Used as a factor-quality weight. Returned series is aligned to `prices.index` and the Sharpe at day t is computed from returns over [t-window, t]. """ long_mask = factor_rank >= 0.9 short_mask = factor_rank <= 0.1 # Rebalance: hold the mask constant between rebal dates rebal_mask = pd.Series(False, index=factor_rank.index) rebal_mask.iloc[::rebal] = True long_w = long_mask.astype(float).div(long_mask.sum(axis=1).replace(0, np.nan), axis=0) short_w = short_mask.astype(float).div(short_mask.sum(axis=1).replace(0, np.nan), axis=0) long_w[~rebal_mask] = np.nan short_w[~rebal_mask] = np.nan long_w = long_w.ffill().fillna(0.0) short_w = short_w.ffill().fillna(0.0) rets = prices.pct_change(fill_method=None) long_ret = (long_w.shift(1) * rets).sum(axis=1) short_ret = (short_w.shift(1) * rets).sum(axis=1) long_turn = long_w.diff().abs().sum(axis=1).fillna(0.0) short_turn = short_w.diff().abs().sum(axis=1).fillna(0.0) ls_ret = (long_ret - short_ret) - (long_turn + short_turn) * tcost ls_ret = ls_ret.fillna(0.0) mean = ls_ret.rolling(window, min_periods=window // 2).mean() std = ls_ret.rolling(window, min_periods=window // 2).std() sharpe = (mean / std) * np.sqrt(252) return sharpe # --------------------------------------------------------------------------- # Strategy # --------------------------------------------------------------------------- class AlphaFactorStrategy(Strategy): """ Multi-factor long-only with rolling LS-Sharpe-weighted signal blend, inverse-vol weighting, and portfolio-level volatility targeting. Why LS-Sharpe and not IC? IC (rank-forward correlation) measures directional accuracy but ignores the magnitude of cross-sectional dispersion. Two factors with identical IC can have very different P&L. Empirically on this sample rev_1m has IC t-stat +5 but LS Sharpe -12 — its top decile are freshly crashed names that keep crashing. We weight by a lagged 252d rolling LS-Sharpe (top-decile minus bottom-decile, monthly rebalance, 10bps t-cost) and floor weights at zero so demoted factors simply drop out. The strategy requires a market return series (e.g. SPY pct_change) passed at construction time — it is NOT derived from data inside generate_signals, because the cross-sectional universe contains only selected tickers while we want a stable market benchmark for beta/residual computations. """ def __init__( self, mkt_returns: pd.Series, top_n: int = 15, rebal_freq: int = 10, vol_window: int = 60, vol_target_annual: float | None = 0.18, ic_window: int = 252, exposure_clip: tuple[float, float] = (0.30, 1.50), fwd_window: int = 21, weight_scheme: str = "ls_sharpe", # {"ls_sharpe", "ic", "equal"} min_weight: float = 0.0, # floor per-factor weight (0 = drop losers) ): self.mkt_returns = mkt_returns self.top_n = top_n self.rebal_freq = rebal_freq self.vol_window = vol_window self.vol_target_annual = vol_target_annual self.ic_window = ic_window self.exposure_clip = exposure_clip self.fwd_window = fwd_window self.weight_scheme = weight_scheme self.min_weight = min_weight # ---- Factor matrix ---- def compute_factors(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]: betas, resid_vol = _rolling_beta_and_residvol( data, self.mkt_returns, window=self.vol_window) factors = { "mom_12_1": f_mom_12_1(data), "mom_7_1": f_mom_7_1(data), "mom_residual": f_mom_residual(data, self.mkt_returns, betas=betas), "rev_1m": f_rev_1m(data), "w52_high": f_w52_high(data), "max5_neg": f_max5_neg(data), "recovery_63": f_recovery_63(data), "trend_strength": f_trend_strength(data), "idio_vol_neg": -resid_vol, "low_beta": -betas, } return factors # ---- Full pipeline ---- def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: factors = self.compute_factors(data) ranks = {k: xsec_rank(v) for k, v in factors.items()} if self.weight_scheme == "ic": fwd_ret = data.shift(-self.fwd_window) / data - 1 weight_series = { k: rolling_ic(ranks[k], fwd_ret, window=self.ic_window).shift(self.fwd_window) for k in ranks } elif self.weight_scheme == "ls_sharpe": weight_series = { k: _rolling_ls_sharpe(ranks[k], data, window=self.ic_window, rebal=21, tcost=0.001).shift(self.fwd_window) for k in ranks } elif self.weight_scheme == "equal": weight_series = {k: pd.Series(1.0, index=ranks[k].index) for k in ranks} else: raise ValueError(f"unknown weight_scheme {self.weight_scheme!r}") composite = None weight_norm = None for k, rk in ranks.items(): w = weight_series[k].reindex(rk.index).fillna(0.0) if self.min_weight is not None: w = w.where(w > self.min_weight, 0.0) contrib = rk.mul(w, axis=0) composite = contrib if composite is None else composite.add(contrib, fill_value=0.0) abs_w = w.abs() weight_norm = abs_w if weight_norm is None else weight_norm.add(abs_w, fill_value=0) weight_norm = weight_norm.replace(0, np.nan) composite = composite.div(weight_norm, axis=0) # Top-N selection. sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom") n_valid = composite.notna().sum(axis=1) enough = n_valid >= self.top_n top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1) # Inverse-vol weighting within top_n. rets = data.pct_change(fill_method=None) vol = rets.rolling(self.vol_window, min_periods=self.vol_window).std() inv_vol = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0) row_sums = inv_vol.sum(axis=1).replace(0, np.nan) weights = inv_vol.div(row_sums, axis=0).fillna(0.0) # Rebalance schedule. warmup = max(252, self.vol_window + 21, self.ic_window + self.fwd_window) rebal_mask = pd.Series(False, index=data.index) rebal_idx = list(range(warmup, len(data), self.rebal_freq)) rebal_mask.iloc[rebal_idx] = True weights[~rebal_mask] = np.nan weights = weights.ffill().fillna(0.0) weights.iloc[:warmup] = 0.0 # Volatility targeting at the portfolio level. if self.vol_target_annual is not None: # Use returns of the *current* weight vector; vol is trailing realized # on the applied weights so no lookahead. Compute after ffill. port_rets = (weights.shift(1) * rets).sum(axis=1) port_vol = port_rets.rolling(self.vol_window, min_periods=self.vol_window).std() * np.sqrt(252) scale = (self.vol_target_annual / port_vol).clip(*self.exposure_clip) scale = scale.fillna(method="ffill").fillna(1.0) weights = weights.mul(scale, axis=0) return weights.shift(1).fillna(0.0)