""" Interaction / multiplicative factor strategy. Rationale: in the 10y PIT diagnostics, each single-factor top decile clocks ~0.5–0.8 Sharpe, yet the production Recovery+Mom Top10 delivers 0.92. The extra alpha comes from an AND-style interaction — stocks that rank high on BOTH factors simultaneously. Linear rank-blending loses this because a stock can make top_n by being middling on many factors instead of extreme on a few. This module provides: * `MultiplicativeFactorStrategy` — picks top_n stocks by the geometric mean (equivalently the product) of cross-sectional factor ranks. Concentrates on consensus winners. * `VotingFactorStrategy` — counts how many factors place a stock in its top `vote_pct`; selects stocks clearing a minimum vote threshold. Breaks ties by the sum of ranks. Robust when factor ICs drift. * `SubStrategyEnsemble` — equal-weight blend of Recovery+Mom Top10, fc_up_cap+mom_gap monthly, and a new Multiplicative("mom × recovery × idio_vol_neg") sleeve. Diversifies across independent alpha sources rather than across factor primitives. """ from __future__ import annotations import numpy as np import pandas as pd from research.alpha_factors import (_rolling_beta_and_residvol, f_mom_12_1, f_mom_7_1, f_rev_1m, f_w52_high, f_max5_neg, f_recovery_63, f_trend_strength, xsec_rank, f_mom_residual) from strategies.base import Strategy from strategies.factor_combo import FactorComboStrategy from strategies.recovery_momentum import RecoveryMomentumStrategy # --------------------------------------------------------------------------- # Multiplicative top-N # --------------------------------------------------------------------------- class MultiplicativeFactorStrategy(Strategy): """ Top-N by product of selected factor ranks (equivalent to rank-geometric-mean). Parameters ---------- factor_names : list[str] Keys into the factor library. Supported: mom_12_1, mom_7_1, mom_residual, recovery_63, w52_high, idio_vol_neg, mom_x_recovery (shortcut pair). top_n : int Number of stocks. rebal_freq : int Rebal interval in trading days. mkt_returns : pd.Series | None Required for mom_residual / idio_vol_neg. """ def __init__(self, factor_names: list[str], top_n: int = 10, rebal_freq: int = 21, mkt_returns: pd.Series | None = None, weighting: str = "equal", signal_concentration: float = 0.0, dispersion_scale: bool = False): """ Parameters ---------- signal_concentration : float Exponent applied to composite score when weighting=='signal'. 0 → equal weight within top_n; higher → more weight on top ranks. dispersion_scale : bool Scale total exposure by z-scored cross-sectional rank dispersion, clipped to [0.5, 1.3]. Expands in high-dispersion regimes. """ self.factor_names = factor_names self.top_n = top_n self.rebal_freq = rebal_freq self.mkt_returns = mkt_returns self.weighting = weighting self.signal_concentration = signal_concentration self.dispersion_scale = dispersion_scale def _build(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]: betas, resid_vol = (None, None) if any(f in ("mom_residual", "idio_vol_neg", "low_beta") for f in self.factor_names): if self.mkt_returns is None: raise ValueError("mkt_returns required for beta-based factors") betas, resid_vol = _rolling_beta_and_residvol(data, self.mkt_returns, 60) lib = { "mom_12_1": lambda: f_mom_12_1(data), "mom_7_1": lambda: f_mom_7_1(data), "mom_residual": lambda: f_mom_residual(data, self.mkt_returns, betas=betas), "recovery_63": lambda: f_recovery_63(data), "w52_high": lambda: f_w52_high(data), "idio_vol_neg": lambda: -resid_vol, "low_beta": lambda: -betas, "trend": lambda: f_trend_strength(data), } return {n: lib[n]() for n in self.factor_names} def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: factors = self._build(data) ranks = {n: xsec_rank(v) for n, v in factors.items()} # Product of ranks. If any rank is NaN, product is NaN → row excluded. composite = None for rk in ranks.values(): composite = rk if composite is None else composite.mul(rk, fill_value=np.nan) composite = composite.where(~rk.isna(), np.nan) sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom") n_valid = composite.notna().sum(axis=1) enough = n_valid >= self.top_n top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1) if self.weighting == "equal": raw = top_mask.astype(float) elif self.weighting == "inv_vol": vol = data.pct_change(fill_method=None).rolling(60).std() raw = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0) elif self.weighting == "signal": # Weight ∝ composite^concentration, only among top_mask picks. score = composite.where(top_mask, 0.0).fillna(0.0) raw = score ** max(self.signal_concentration, 1.0) else: raise ValueError(f"bad weighting {self.weighting!r}") row_sums = raw.sum(axis=1).replace(0, np.nan) weights = raw.div(row_sums, axis=0).fillna(0.0) warmup = 252 rebal_mask = pd.Series(False, index=data.index) rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True weights[~rebal_mask] = np.nan weights = weights.ffill().fillna(0.0) weights.iloc[:warmup] = 0.0 if self.dispersion_scale: # Cross-sectional rank dispersion = daily std of composite. Scale # exposure up in high-dispersion regimes (alpha opportunity richer). disp = composite.std(axis=1) z = (disp - disp.rolling(252, min_periods=126).mean()) \ / disp.rolling(252, min_periods=126).std() scale = (1.0 + 0.3 * z.clip(-1, 1)).clip(0.5, 1.3) scale = scale.reindex(weights.index).fillna(1.0) weights = weights.mul(scale, axis=0) return weights.shift(1).fillna(0.0) # --------------------------------------------------------------------------- # Voting top-N # --------------------------------------------------------------------------- class VotingFactorStrategy(Strategy): """ Top-N by vote count: each factor contributes 1 vote if a stock is in its top `vote_pct` percentile. Select stocks with vote_count ≥ min_votes, break ties by sum of ranks. """ def __init__(self, factor_names: list[str], top_n: int = 10, rebal_freq: int = 21, vote_pct: float = 0.25, min_votes: int = 3, mkt_returns: pd.Series | None = None): self.factor_names = factor_names self.top_n = top_n self.rebal_freq = rebal_freq self.vote_pct = vote_pct self.min_votes = min_votes self.mkt_returns = mkt_returns def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: builder = MultiplicativeFactorStrategy( factor_names=self.factor_names, top_n=self.top_n, rebal_freq=self.rebal_freq, mkt_returns=self.mkt_returns) factors = builder._build(data) ranks = {n: xsec_rank(v) for n, v in factors.items()} thresh = 1 - self.vote_pct votes = sum((rk >= thresh).astype(float) for rk in ranks.values()) rank_sum = sum(rk.fillna(0) for rk in ranks.values()) # Primary sort: vote count; tiebreaker: rank_sum. Build a composite. composite = votes + rank_sum / (len(ranks) * 10) composite = composite.where(votes >= self.min_votes, np.nan) sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom") n_valid = composite.notna().sum(axis=1) enough = n_valid >= 1 effective_n = n_valid.clip(upper=self.top_n) top_mask = (sel_rank <= effective_n.values.reshape(-1, 1)) & enough.values.reshape(-1, 1) raw = top_mask.astype(float) row_sums = raw.sum(axis=1).replace(0, np.nan) weights = raw.div(row_sums, axis=0).fillna(0.0) warmup = 252 rebal_mask = pd.Series(False, index=data.index) rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True weights[~rebal_mask] = np.nan weights = weights.ffill().fillna(0.0) weights.iloc[:warmup] = 0.0 return weights.shift(1).fillna(0.0) # --------------------------------------------------------------------------- # Sub-strategy ensemble # --------------------------------------------------------------------------- class SubStrategyEnsemble(Strategy): """Equal-weight blend of several long-only sub-strategies.""" def __init__(self, sub_strats: list[Strategy]): self.sub_strats = sub_strats self.w = 1.0 / len(sub_strats) def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: out = None for strat in self.sub_strats: sig = strat.generate_signals(data) * self.w out = sig if out is None else out.add(sig, fill_value=0.0) return out def default_ensemble(mkt_returns: pd.Series) -> SubStrategyEnsemble: return SubStrategyEnsemble([ RecoveryMomentumStrategy(top_n=10), FactorComboStrategy("up_cap+mom_gap", rebal_freq=21, top_n=10), MultiplicativeFactorStrategy( factor_names=["mom_12_1", "recovery_63", "idio_vol_neg"], top_n=10, rebal_freq=21, mkt_returns=mkt_returns, ), ])