Files
quant/research/interaction_alpha.py

234 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Interaction / multiplicative factor strategy.
Rationale: in the 10y PIT diagnostics, each single-factor top decile clocks
~0.50.8 Sharpe, yet the production Recovery+Mom Top10 delivers 0.92. The
extra alpha comes from an AND-style interaction — stocks that rank high on
BOTH factors simultaneously. Linear rank-blending loses this because a stock
can make top_n by being middling on many factors instead of extreme on a few.
This module provides:
* `MultiplicativeFactorStrategy` — picks top_n stocks by the geometric mean
(equivalently the product) of cross-sectional factor ranks. Concentrates
on consensus winners.
* `VotingFactorStrategy` — counts how many factors place a stock in its
top `vote_pct`; selects stocks clearing a minimum vote threshold. Breaks
ties by the sum of ranks. Robust when factor ICs drift.
* `SubStrategyEnsemble` — equal-weight blend of Recovery+Mom Top10,
fc_up_cap+mom_gap monthly, and a new Multiplicative("mom × recovery ×
idio_vol_neg") sleeve. Diversifies across independent alpha sources
rather than across factor primitives.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from research.alpha_factors import (_rolling_beta_and_residvol, f_mom_12_1,
f_mom_7_1, f_rev_1m, f_w52_high, f_max5_neg,
f_recovery_63, f_trend_strength, xsec_rank,
f_mom_residual)
from strategies.base import Strategy
from strategies.factor_combo import FactorComboStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
# ---------------------------------------------------------------------------
# Multiplicative top-N
# ---------------------------------------------------------------------------
class MultiplicativeFactorStrategy(Strategy):
"""
Top-N by product of selected factor ranks (equivalent to rank-geometric-mean).
Parameters
----------
factor_names : list[str]
Keys into the factor library. Supported:
mom_12_1, mom_7_1, mom_residual, recovery_63, w52_high,
idio_vol_neg, mom_x_recovery (shortcut pair).
top_n : int
Number of stocks.
rebal_freq : int
Rebal interval in trading days.
mkt_returns : pd.Series | None
Required for mom_residual / idio_vol_neg.
"""
def __init__(self, factor_names: list[str], top_n: int = 10,
rebal_freq: int = 21, mkt_returns: pd.Series | None = None,
weighting: str = "equal", signal_concentration: float = 0.0,
dispersion_scale: bool = False):
"""
Parameters
----------
signal_concentration : float
Exponent applied to composite score when weighting=='signal'.
0 → equal weight within top_n; higher → more weight on top ranks.
dispersion_scale : bool
Scale total exposure by z-scored cross-sectional rank dispersion,
clipped to [0.5, 1.3]. Expands in high-dispersion regimes.
"""
self.factor_names = factor_names
self.top_n = top_n
self.rebal_freq = rebal_freq
self.mkt_returns = mkt_returns
self.weighting = weighting
self.signal_concentration = signal_concentration
self.dispersion_scale = dispersion_scale
def _build(self, data: pd.DataFrame) -> dict[str, pd.DataFrame]:
betas, resid_vol = (None, None)
if any(f in ("mom_residual", "idio_vol_neg", "low_beta") for f in self.factor_names):
if self.mkt_returns is None:
raise ValueError("mkt_returns required for beta-based factors")
betas, resid_vol = _rolling_beta_and_residvol(data, self.mkt_returns, 60)
lib = {
"mom_12_1": lambda: f_mom_12_1(data),
"mom_7_1": lambda: f_mom_7_1(data),
"mom_residual": lambda: f_mom_residual(data, self.mkt_returns, betas=betas),
"recovery_63": lambda: f_recovery_63(data),
"w52_high": lambda: f_w52_high(data),
"idio_vol_neg": lambda: -resid_vol,
"low_beta": lambda: -betas,
"trend": lambda: f_trend_strength(data),
}
return {n: lib[n]() for n in self.factor_names}
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
factors = self._build(data)
ranks = {n: xsec_rank(v) for n, v in factors.items()}
# Product of ranks. If any rank is NaN, product is NaN → row excluded.
composite = None
for rk in ranks.values():
composite = rk if composite is None else composite.mul(rk, fill_value=np.nan)
composite = composite.where(~rk.isna(), np.nan)
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (sel_rank <= self.top_n) & enough.values.reshape(-1, 1)
if self.weighting == "equal":
raw = top_mask.astype(float)
elif self.weighting == "inv_vol":
vol = data.pct_change(fill_method=None).rolling(60).std()
raw = (1.0 / vol.replace(0, np.nan)).where(top_mask, 0.0).fillna(0.0)
elif self.weighting == "signal":
# Weight ∝ composite^concentration, only among top_mask picks.
score = composite.where(top_mask, 0.0).fillna(0.0)
raw = score ** max(self.signal_concentration, 1.0)
else:
raise ValueError(f"bad weighting {self.weighting!r}")
row_sums = raw.sum(axis=1).replace(0, np.nan)
weights = raw.div(row_sums, axis=0).fillna(0.0)
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
if self.dispersion_scale:
# Cross-sectional rank dispersion = daily std of composite. Scale
# exposure up in high-dispersion regimes (alpha opportunity richer).
disp = composite.std(axis=1)
z = (disp - disp.rolling(252, min_periods=126).mean()) \
/ disp.rolling(252, min_periods=126).std()
scale = (1.0 + 0.3 * z.clip(-1, 1)).clip(0.5, 1.3)
scale = scale.reindex(weights.index).fillna(1.0)
weights = weights.mul(scale, axis=0)
return weights.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Voting top-N
# ---------------------------------------------------------------------------
class VotingFactorStrategy(Strategy):
"""
Top-N by vote count: each factor contributes 1 vote if a stock is in its
top `vote_pct` percentile. Select stocks with vote_count ≥ min_votes,
break ties by sum of ranks.
"""
def __init__(self, factor_names: list[str], top_n: int = 10,
rebal_freq: int = 21, vote_pct: float = 0.25,
min_votes: int = 3, mkt_returns: pd.Series | None = None):
self.factor_names = factor_names
self.top_n = top_n
self.rebal_freq = rebal_freq
self.vote_pct = vote_pct
self.min_votes = min_votes
self.mkt_returns = mkt_returns
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
builder = MultiplicativeFactorStrategy(
factor_names=self.factor_names, top_n=self.top_n,
rebal_freq=self.rebal_freq, mkt_returns=self.mkt_returns)
factors = builder._build(data)
ranks = {n: xsec_rank(v) for n, v in factors.items()}
thresh = 1 - self.vote_pct
votes = sum((rk >= thresh).astype(float) for rk in ranks.values())
rank_sum = sum(rk.fillna(0) for rk in ranks.values())
# Primary sort: vote count; tiebreaker: rank_sum. Build a composite.
composite = votes + rank_sum / (len(ranks) * 10)
composite = composite.where(votes >= self.min_votes, np.nan)
sel_rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= 1
effective_n = n_valid.clip(upper=self.top_n)
top_mask = (sel_rank <= effective_n.values.reshape(-1, 1)) & enough.values.reshape(-1, 1)
raw = top_mask.astype(float)
row_sums = raw.sum(axis=1).replace(0, np.nan)
weights = raw.div(row_sums, axis=0).fillna(0.0)
warmup = 252
rebal_mask = pd.Series(False, index=data.index)
rebal_mask.iloc[list(range(warmup, len(data), self.rebal_freq))] = True
weights[~rebal_mask] = np.nan
weights = weights.ffill().fillna(0.0)
weights.iloc[:warmup] = 0.0
return weights.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Sub-strategy ensemble
# ---------------------------------------------------------------------------
class SubStrategyEnsemble(Strategy):
"""Equal-weight blend of several long-only sub-strategies."""
def __init__(self, sub_strats: list[Strategy]):
self.sub_strats = sub_strats
self.w = 1.0 / len(sub_strats)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
out = None
for strat in self.sub_strats:
sig = strat.generate_signals(data) * self.w
out = sig if out is None else out.add(sig, fill_value=0.0)
return out
def default_ensemble(mkt_returns: pd.Series) -> SubStrategyEnsemble:
return SubStrategyEnsemble([
RecoveryMomentumStrategy(top_n=10),
FactorComboStrategy("up_cap+mom_gap", rebal_freq=21, top_n=10),
MultiplicativeFactorStrategy(
factor_names=["mom_12_1", "recovery_63", "idio_vol_neg"],
top_n=10, rebal_freq=21, mkt_returns=mkt_returns,
),
])