""" Factor combination strategies discovered through iterative factor research. US champions: - rec_mfilt+deep×upvol: Recovery (momentum-filtered) + deep recovery × up-volume - ma200+mom7m+rec126: Above MA200 + intermediate momentum + deep recovery - rec_mfilt+ma200: Recovery (momentum-filtered) + above MA200 - mom7m+rec126: Intermediate momentum + deep recovery CN champions: - up_cap+quality_mom: Up-capture ratio + quality momentum composite - down_resil+qual_mom: Down-resilience + quality momentum composite - rec63+mom×gap: Recovery 63d + momentum × gap-up frequency - up_cap+mom×gap: Up-capture + momentum × gap-up frequency Each can run at daily/weekly/biweekly/monthly rebalancing frequency. """ import numpy as np import pandas as pd from strategies.base import Strategy # --------------------------------------------------------------------------- # Factor building blocks # --------------------------------------------------------------------------- def _mom_12_1(p): return p.shift(21).pct_change(231) def _mom_intermediate(p): return p.shift(21).pct_change(147) def _rec_63(p): return p / p.rolling(63, min_periods=63).min() - 1 def _rec_126(p): return p / p.rolling(126, min_periods=126).min() - 1 def _above_ma200(p): return p / p.rolling(200, min_periods=200).mean() - 1 def _up_volume_proxy(p): ret = p.pct_change() return ret.where(ret > 0, 0).rolling(20, min_periods=15).sum() def _gap_up_freq(p): ret = p.pct_change() return (ret > 0.01).astype(float).rolling(60, min_periods=40).mean() def _consistent_returns(p): ret = p.pct_change() return (ret > 0).astype(float).rolling(252, min_periods=126).mean() def _rec_mom_filtered(p): rec = p / p.rolling(126, min_periods=126).min() - 1 mom = p.shift(21).pct_change(105) return rec.where(mom > 0, np.nan) def _up_capture(p): ret = p.pct_change() mkt = ret.mean(axis=1) up_mkt = mkt > 0 arr = ret.values.copy() arr[~up_mkt.values, :] = np.nan stock_up = pd.DataFrame(arr, index=ret.index, columns=ret.columns) mkt_up_vals = mkt.where(up_mkt, np.nan) stock_avg = stock_up.rolling(60, min_periods=20).mean() mkt_avg = mkt_up_vals.rolling(60, min_periods=20).mean() return stock_avg.div(mkt_avg, axis=0) def _down_resilience(p): ret = p.pct_change() mkt = ret.mean(axis=1) down_mkt = mkt < 0 arr = ret.values.copy() arr[~down_mkt.values, :] = np.nan down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns) return -down_ret.rolling(120, min_periods=30).mean() def _quality_mom(p): mom_r = _mom_12_1(p).rank(axis=1, pct=True, na_option="keep") con_r = _consistent_returns(p).rank(axis=1, pct=True, na_option="keep") up_r = _up_volume_proxy(p).rank(axis=1, pct=True, na_option="keep") return 0.4 * mom_r + 0.3 * con_r + 0.3 * up_r def _mom_x_gap(p): mom_r = _mom_12_1(p).rank(axis=1, pct=True, na_option="keep") gap_r = _gap_up_freq(p).rank(axis=1, pct=True, na_option="keep") return mom_r * gap_r # --------------------------------------------------------------------------- # Combo signal constructors (weighted rank sums) # --------------------------------------------------------------------------- def _rank(df): return df.rank(axis=1, pct=True, na_option="keep") # US combos def signal_rec_mfilt_deep_upvol(p): rec_mfilt_r = _rank(_rec_mom_filtered(p)) deep_upvol_r = _rank(_rec_126(p)) * _rank(_up_volume_proxy(p)) deep_upvol_rr = _rank(deep_upvol_r) return 0.5 * rec_mfilt_r + 0.5 * deep_upvol_rr def signal_ma200_mom7m_rec126(p): return (0.33 * _rank(_above_ma200(p)) + 0.33 * _rank(_mom_intermediate(p)) + 0.34 * _rank(_rec_126(p))) def signal_rec_mfilt_ma200(p): return 0.5 * _rank(_rec_mom_filtered(p)) + 0.5 * _rank(_above_ma200(p)) def signal_mom7m_rec126(p): return 0.5 * _rank(_mom_intermediate(p)) + 0.5 * _rank(_rec_126(p)) # CN combos def signal_up_cap_quality_mom(p): return 0.5 * _rank(_up_capture(p)) + 0.5 * _rank(_quality_mom(p)) def signal_down_resil_qual_mom(p): return 0.5 * _rank(_down_resilience(p)) + 0.5 * _rank(_quality_mom(p)) def signal_rec63_mom_gap(p): return 0.5 * _rank(_rec_63(p)) + 0.5 * _rank(_mom_x_gap(p)) def signal_up_cap_mom_gap(p): return 0.5 * _rank(_up_capture(p)) + 0.5 * _rank(_mom_x_gap(p)) # --------------------------------------------------------------------------- # Signal registry: name -> callable(prices) -> DataFrame # --------------------------------------------------------------------------- SIGNAL_REGISTRY = { # US "rec_mfilt+deep_upvol": signal_rec_mfilt_deep_upvol, "ma200+mom7m+rec126": signal_ma200_mom7m_rec126, "rec_mfilt+ma200": signal_rec_mfilt_ma200, "mom7m+rec126": signal_mom7m_rec126, # CN "up_cap+quality_mom": signal_up_cap_quality_mom, "down_resil+qual_mom": signal_down_resil_qual_mom, "rec63+mom_gap": signal_rec63_mom_gap, "up_cap+mom_gap": signal_up_cap_mom_gap, } # --------------------------------------------------------------------------- # Strategy class # --------------------------------------------------------------------------- class FactorComboStrategy(Strategy): """ Generic factor-combination strategy with configurable rebalancing frequency. Parameters: signal_name: key into SIGNAL_REGISTRY rebal_freq: rebalancing interval in trading days (1=daily, 5=weekly, 10=biweekly, 21=monthly) top_n: number of stocks to hold """ REBAL_LABELS = {1: "daily", 5: "weekly", 10: "biweekly", 21: "monthly"} def __init__(self, signal_name: str, rebal_freq: int = 21, top_n: int = 10): if signal_name not in SIGNAL_REGISTRY: raise ValueError(f"Unknown signal: {signal_name}. " f"Available: {list(SIGNAL_REGISTRY.keys())}") self.signal_name = signal_name self.signal_func = SIGNAL_REGISTRY[signal_name] self.rebal_freq = rebal_freq self.top_n = top_n def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: sig = self.signal_func(data) # Select top_n by signal rank rank = sig.rank(axis=1, ascending=False, na_option="bottom") n_valid = sig.notna().sum(axis=1) enough = n_valid >= self.top_n top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1) raw = top_mask.astype(float) row_sums = raw.sum(axis=1).replace(0, np.nan) signals = raw.div(row_sums, axis=0).fillna(0.0) # Rebalance at configured frequency warmup = 252 rebal_mask = pd.Series(False, index=data.index) rebal_indices = list(range(warmup, len(data), self.rebal_freq)) rebal_mask.iloc[rebal_indices] = True signals[~rebal_mask] = np.nan signals = signals.ffill().fillna(0.0) signals.iloc[:warmup] = 0.0 return signals.shift(1).fillna(0.0)