import numpy as np import pandas as pd from strategies.base import Strategy class MomentumQualityStrategy(Strategy): """ Momentum + quality factor strategy using price-derived signals only. Quality proxy (from price data): - Return consistency: fraction of positive monthly returns over past year (stocks that grind up steadily are higher "quality" than volatile jumpers) - Low max drawdown: smaller peak-to-trough drop = more stable Combined with momentum, this favors stocks with strong AND stable uptrends, filtering out lottery-ticket stocks that spike then crash. """ def __init__(self, momentum_period: int = 252, skip: int = 21, quality_window: int = 252, top_n: int = 20): self.momentum_period = momentum_period self.skip = skip self.quality_window = quality_window self.top_n = top_n def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame: # --- Momentum factor --- momentum = data.shift(self.skip).pct_change(self.momentum_period - self.skip) # --- Quality factor 1: return consistency --- # Rolling 21-day (monthly) returns, then count fraction positive over past year monthly_ret = data.pct_change(21) consistency = monthly_ret.rolling(self.quality_window).apply( lambda x: (x > 0).sum() / len(x), raw=True ) # --- Quality factor 2: inverse max drawdown --- # Smaller drawdown = higher quality def rolling_max_dd(prices: pd.DataFrame, window: int) -> pd.DataFrame: rolling_max = prices.rolling(window).max() drawdown = prices / rolling_max - 1 # Rolling worst drawdown (most negative) worst_dd = drawdown.rolling(window).min() # Invert: less negative = better, so negate return -worst_dd # higher = smaller drawdown = better inv_dd = rolling_max_dd(data, self.quality_window) # --- Cross-sectional ranking --- mom_rank = momentum.rank(axis=1, pct=True, na_option="bottom") con_rank = consistency.rank(axis=1, pct=True, na_option="bottom") dd_rank = inv_dd.rank(axis=1, pct=True, na_option="bottom") # Composite: momentum 50%, consistency 25%, drawdown 25% scores = 0.50 * mom_rank + 0.25 * con_rank + 0.25 * dd_rank # --- Select top_n --- n_valid = scores.notna().sum(axis=1) enough = n_valid >= self.top_n score_rank = scores.rank(axis=1, ascending=False, na_option="bottom") top_mask = (score_rank <= self.top_n) & enough.values.reshape(-1, 1) raw = top_mask.astype(float) row_sums = raw.sum(axis=1).replace(0, np.nan) signals = raw.div(row_sums, axis=0).fillna(0.0) warmup = max(self.momentum_period, self.quality_window) + 21 signals.iloc[:warmup] = 0.0 return signals.shift(1).fillna(0.0)