""" Factor Research Script — Professional QR-style factor mining. Tests candidate alpha factors using: - Information Coefficient (IC): rank correlation of signal vs forward returns - IC Information Ratio (ICIR): mean(IC) / std(IC), measures signal consistency - Quintile long-short spread: monotonicity of returns across signal buckets - Turnover: daily rank change, proxy for trading cost - Decay profile: IC at 1d, 5d, 10d, 20d horizons - Sub-period stability: IC consistency across rolling windows - Factor correlation matrix: ensures new factors are orthogonal to known ones Usage: uv run python factor_research.py --market us uv run python factor_research.py --market cn """ from __future__ import annotations import argparse import warnings import numpy as np import pandas as pd import data_manager from universe import UNIVERSES warnings.filterwarnings("ignore", category=FutureWarning) HORIZONS = [1, 5, 10, 20] # --------------------------------------------------------------------------- # Factor definitions — each returns a DataFrame (dates x stocks) of scores # --------------------------------------------------------------------------- def _safe_rank(df: pd.DataFrame) -> pd.DataFrame: return df.rank(axis=1, pct=True, na_option="keep") def _rolling_ret(prices: pd.DataFrame, window: int) -> pd.DataFrame: return prices.pct_change(window) # --- Known factors (baselines) --- def factor_momentum_12_1(prices: pd.DataFrame) -> pd.DataFrame: """Classic 12-1 month momentum.""" return prices.shift(21).pct_change(231) def factor_recovery(prices: pd.DataFrame) -> pd.DataFrame: """Price / 63-day low - 1.""" return prices / prices.rolling(63, min_periods=63).min() - 1 def factor_inverse_vol(prices: pd.DataFrame) -> pd.DataFrame: """Negative 60-day realized volatility (low vol = high score).""" return -prices.pct_change().rolling(60, min_periods=60).std() # --- NEW candidate factors --- def factor_short_term_reversal(prices: pd.DataFrame) -> pd.DataFrame: """5-day return reversal. Hypothesis: short-term mean reversion.""" return -prices.pct_change(5) def factor_idio_vol_change(prices: pd.DataFrame) -> pd.DataFrame: """Change in idiosyncratic volatility (20d vs 60d). Hypothesis: declining vol = stabilizing, predicts positive returns.""" ret = prices.pct_change() vol_20 = ret.rolling(20, min_periods=20).std() vol_60 = ret.rolling(60, min_periods=60).std() return -(vol_20 / vol_60.replace(0, np.nan) - 1) # negative = vol declining def factor_volume_price_divergence(prices: pd.DataFrame, volume: pd.DataFrame | None = None) -> pd.DataFrame: """Price up but momentum fading — proxy via acceleration. Without volume data, use return acceleration as proxy.""" ret_5 = prices.pct_change(5) ret_20 = prices.pct_change(20) return ret_5 - ret_20 / 4 # recent returns outpacing trend def factor_max_drawdown_recovery(prices: pd.DataFrame) -> pd.DataFrame: """How much of the 60-day max drawdown has been recovered. Hypothesis: stocks that recover from drawdowns continue recovering.""" rolling_max = prices.rolling(60, min_periods=60).max() drawdown = prices / rolling_max - 1 # negative rolling_min_dd = drawdown.rolling(60, min_periods=20).min() # worst drawdown recovery_pct = drawdown / rolling_min_dd.replace(0, np.nan) return recovery_pct # closer to 0 = more recovered def factor_skewness(prices: pd.DataFrame) -> pd.DataFrame: """Negative 20-day return skewness. Hypothesis: negatively skewed stocks are overpriced (lottery preference).""" ret = prices.pct_change() return -ret.rolling(20, min_periods=20).skew() def factor_high_low_range(prices: pd.DataFrame) -> pd.DataFrame: """20-day high-low range relative to price. Hypothesis: narrow range = consolidation, breakout ahead.""" high_20 = prices.rolling(20, min_periods=20).max() low_20 = prices.rolling(20, min_periods=20).min() mid = (high_20 + low_20) / 2 return -(high_20 - low_20) / mid.replace(0, np.nan) # negative range = narrow = high score def factor_mean_reversion_residual(prices: pd.DataFrame) -> pd.DataFrame: """Distance from 20-day MA as fraction of 60-day vol. Hypothesis: stocks far below MA revert. Z-score style.""" ma_20 = prices.rolling(20, min_periods=20).mean() vol_60 = prices.pct_change().rolling(60, min_periods=60).std() * prices return -(prices - ma_20) / vol_60.replace(0, np.nan) # below MA = high score def factor_up_down_vol_ratio(prices: pd.DataFrame) -> pd.DataFrame: """Ratio of upside to downside semi-deviation (20d). Hypothesis: stocks with more upside vol have positive momentum.""" ret = prices.pct_change() up_vol = ret.where(ret > 0, 0).rolling(20, min_periods=15).std() down_vol = ret.where(ret < 0, 0).rolling(20, min_periods=15).std() return up_vol / down_vol.replace(0, np.nan) def factor_consecutive_up_days(prices: pd.DataFrame) -> pd.DataFrame: """Fraction of positive return days in last 10 days. Hypothesis: persistent winners keep winning (short-term).""" ret = prices.pct_change() return (ret > 0).astype(float).rolling(10, min_periods=10).mean() def factor_gap_momentum(prices: pd.DataFrame) -> pd.DataFrame: """Cumulative overnight-like gaps: close-to-close vs intraday proxy. Using 1-day returns smoothed over 20 days minus 5-day return. Hypothesis: smooth consistent returns beat volatile ones.""" ret_1d = prices.pct_change() smoothness = ret_1d.rolling(20, min_periods=20).mean() * 20 raw_20d = prices.pct_change(20) return smoothness - raw_20d # positive = smoother path def factor_recovery_acceleration(prices: pd.DataFrame) -> pd.DataFrame: """Rate of change of recovery factor. Hypothesis: accelerating recovery is stronger signal than level.""" recovery = prices / prices.rolling(63, min_periods=63).min() - 1 return recovery.pct_change(5) def factor_trend_strength(prices: pd.DataFrame) -> pd.DataFrame: """R-squared of log-price vs time over 60 days. Hypothesis: stocks trending linearly (high R2) continue.""" log_p = np.log(prices.replace(0, np.nan)) def _r2(series): y = series.dropna().values if len(y) < 30: return np.nan x = np.arange(len(y), dtype=float) x -= x.mean() y_dm = y - y.mean() ss_xy = (x * y_dm).sum() ss_xx = (x * x).sum() ss_yy = (y_dm * y_dm).sum() if ss_xx == 0 or ss_yy == 0: return np.nan r2 = (ss_xy ** 2) / (ss_xx * ss_yy) slope = ss_xy / ss_xx return r2 if slope > 0 else -r2 # sign by direction return log_p.rolling(60, min_periods=30).apply(_r2, raw=False) def factor_relative_volume_momentum(prices: pd.DataFrame) -> pd.DataFrame: """Price momentum weighted by how 'cheap' a stock is relative to 52-week range. Hypothesis: momentum in stocks near lows is more likely to persist.""" mom_20 = prices.pct_change(20) high_252 = prices.rolling(252, min_periods=126).max() low_252 = prices.rolling(252, min_periods=126).min() position_in_range = (prices - low_252) / (high_252 - low_252).replace(0, np.nan) return mom_20 * (1 - position_in_range) # momentum * cheapness def factor_52w_high_distance(prices: pd.DataFrame) -> pd.DataFrame: """Distance from 52-week high. Hypothesis: stocks near their highs continue (anchoring bias).""" high_252 = prices.rolling(252, min_periods=126).max() return prices / high_252 # closer to 1 = near high def factor_downside_beta_proxy(prices: pd.DataFrame) -> pd.DataFrame: """Proxy for downside beta using co-movement on market down days. Hypothesis: low downside beta outperforms (asymmetric risk).""" ret = prices.pct_change() market_ret = ret.mean(axis=1) down_days = market_ret < 0 # Mask non-down-day returns to NaN, then rolling mean # Use numpy for correct broadcasting, wider window (120d) so ~54 down # days are available, well above min_periods=20 arr = ret.values.copy() arr[~down_days.values, :] = np.nan down_ret = pd.DataFrame(arr, index=ret.index, columns=ret.columns) avg_down = down_ret.rolling(120, min_periods=20).mean() return -avg_down # negative = less downside = good # --- A-share specific factors --- def factor_liquidity_premium(prices: pd.DataFrame) -> pd.DataFrame: """Amihud illiquidity proxy (using returns only, no volume). Hypothesis: illiquid stocks earn premium in A-shares (retail driven).""" ret = prices.pct_change() # Use absolute return as illiquidity proxy (higher abs ret = less liquid) illiq = ret.abs().rolling(20, min_periods=15).mean() return illiq def factor_lottery_demand(prices: pd.DataFrame) -> pd.DataFrame: """Max daily return in past 20 days (negative). Hypothesis: lottery stocks (high max return) underperform. Strong in A-shares due to retail speculation.""" ret = prices.pct_change() return -ret.rolling(20, min_periods=15).max() def factor_turnover_reversal(prices: pd.DataFrame) -> pd.DataFrame: """Interaction of short-term returns with volatility. High vol + negative return = oversold bounce candidate. Common A-share alpha source.""" ret_5 = prices.pct_change(5) vol_20 = prices.pct_change().rolling(20, min_periods=15).std() return -ret_5 * vol_20 # oversold + high vol = positive def factor_price_level(prices: pd.DataFrame) -> pd.DataFrame: """Negative absolute price level. Hypothesis: low-priced stocks attract retail in A-shares (penny stock effect).""" return -prices # --------------------------------------------------------------------------- # IC and analytics engine # --------------------------------------------------------------------------- def compute_ic( signal: pd.DataFrame, forward_ret: pd.DataFrame, ) -> pd.Series: """Cross-sectional rank IC (Spearman) per day.""" common_idx = signal.index.intersection(forward_ret.index) common_cols = signal.columns.intersection(forward_ret.columns) sig = signal.loc[common_idx, common_cols] fwd = forward_ret.loc[common_idx, common_cols] ics = [] for date in common_idx: s = sig.loc[date].dropna() f = fwd.loc[date].dropna() common = s.index.intersection(f.index) if len(common) < 30: continue ic = s[common].corr(f[common], method="spearman") if np.isfinite(ic): ics.append((date, ic)) if not ics: return pd.Series(dtype=float) return pd.Series(dict(ics)) def compute_quintile_returns( signal: pd.DataFrame, forward_ret: pd.DataFrame, n_quantiles: int = 5, ) -> pd.DataFrame: """Average forward return by signal quintile, per day, then time-averaged.""" common_idx = signal.index.intersection(forward_ret.index) common_cols = signal.columns.intersection(forward_ret.columns) sig = signal.loc[common_idx, common_cols] fwd = forward_ret.loc[common_idx, common_cols] records = [] for date in common_idx: s = sig.loc[date].dropna() f = fwd.loc[date].dropna() common = s.index.intersection(f.index) if len(common) < 50: continue scores = s[common] rets = f[common] try: quintile = pd.qcut(scores, n_quantiles, labels=False, duplicates="drop") except ValueError: continue for q in range(n_quantiles): mask = quintile == q if mask.sum() > 0: records.append({"date": date, "quintile": q + 1, "return": rets[mask].mean()}) if not records: return pd.DataFrame() df = pd.DataFrame(records) return df.groupby("quintile")["return"].mean() * 252 # annualize def compute_turnover(signal: pd.DataFrame) -> float: """Average daily rank change (turnover proxy).""" ranked = signal.rank(axis=1, pct=True, na_option="keep") daily_change = ranked.diff().abs().mean(axis=1) return float(daily_change.mean()) def compute_factor_correlation(factors: dict[str, pd.DataFrame]) -> pd.DataFrame: """Cross-sectional IC correlation between all factor pairs.""" names = list(factors.keys()) n = len(names) corr_matrix = pd.DataFrame(np.nan, index=names, columns=names) # Use time-series of rank-averaged signals avg_ranks = {} for name, sig in factors.items(): ranked = sig.rank(axis=1, pct=True, na_option="keep") avg_ranks[name] = ranked.mean(axis=1).dropna() for i in range(n): for j in range(i, n): s1 = avg_ranks[names[i]] s2 = avg_ranks[names[j]] common = s1.index.intersection(s2.index) if len(common) > 100: c = s1[common].corr(s2[common]) corr_matrix.loc[names[i], names[j]] = c corr_matrix.loc[names[j], names[i]] = c if i == j: corr_matrix.loc[names[i], names[j]] = 1.0 return corr_matrix def analyze_factor( name: str, signal: pd.DataFrame, prices: pd.DataFrame, horizons: list[int] | None = None, ) -> dict: """Full single-factor analysis.""" if horizons is None: horizons = HORIZONS results = {"name": name} # Forward returns at each horizon for h in horizons: fwd_ret = prices.pct_change(h).shift(-h) ic_series = compute_ic(signal, fwd_ret) if len(ic_series) == 0: results[f"ic_{h}d"] = np.nan results[f"icir_{h}d"] = np.nan continue ic_mean = ic_series.mean() ic_std = ic_series.std() icir = ic_mean / ic_std if ic_std > 0 else 0.0 results[f"ic_{h}d"] = ic_mean results[f"icir_{h}d"] = icir if h == 1: results["ic_1d_series"] = ic_series # Quintile analysis at 5-day horizon fwd_5d = prices.pct_change(5).shift(-5) quintiles = compute_quintile_returns(signal, fwd_5d) if not quintiles.empty: results["q5_return"] = float(quintiles.iloc[-1]) # top quintile results["q1_return"] = float(quintiles.iloc[0]) # bottom quintile results["long_short_ann"] = float(quintiles.iloc[-1] - quintiles.iloc[0]) results["monotonicity"] = float(quintiles.corr(pd.Series(range(1, len(quintiles) + 1), index=quintiles.index))) results["quintile_returns"] = quintiles else: results["q5_return"] = np.nan results["q1_return"] = np.nan results["long_short_ann"] = np.nan results["monotonicity"] = np.nan # Turnover results["turnover"] = compute_turnover(signal) # Sub-period IC stability (rolling 252-day IC mean) if "ic_1d_series" in results and len(results["ic_1d_series"]) > 252: rolling_ic = results["ic_1d_series"].rolling(252).mean().dropna() results["ic_stability"] = float((rolling_ic > 0).mean()) # fraction of time IC > 0 results["ic_worst_year"] = float(rolling_ic.min()) results["ic_best_year"] = float(rolling_ic.max()) else: results["ic_stability"] = np.nan results["ic_worst_year"] = np.nan results["ic_best_year"] = np.nan return results # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def get_all_factors(prices: pd.DataFrame, market: str) -> dict[str, pd.DataFrame]: """Build all candidate factor signals.""" factors = {} # Known baselines factors["momentum_12_1"] = factor_momentum_12_1(prices) factors["recovery"] = factor_recovery(prices) factors["inverse_vol"] = factor_inverse_vol(prices) # New candidates — universal factors["short_term_reversal"] = factor_short_term_reversal(prices) factors["idio_vol_change"] = factor_idio_vol_change(prices) factors["return_acceleration"] = factor_volume_price_divergence(prices) factors["drawdown_recovery"] = factor_max_drawdown_recovery(prices) factors["neg_skewness"] = factor_skewness(prices) factors["range_compression"] = factor_high_low_range(prices) factors["mean_rev_zscore"] = factor_mean_reversion_residual(prices) factors["up_down_vol_ratio"] = factor_up_down_vol_ratio(prices) factors["win_streak"] = factor_consecutive_up_days(prices) factors["smooth_momentum"] = factor_gap_momentum(prices) factors["recovery_accel"] = factor_recovery_acceleration(prices) factors["trend_r2"] = factor_trend_strength(prices) factors["cheap_momentum"] = factor_relative_volume_momentum(prices) factors["near_52w_high"] = factor_52w_high_distance(prices) factors["low_downside_beta"] = factor_downside_beta_proxy(prices) # A-share specific (also test on US for comparison) if market == "cn": factors["illiquidity"] = factor_liquidity_premium(prices) factors["anti_lottery"] = factor_lottery_demand(prices) factors["vol_reversal"] = factor_turnover_reversal(prices) factors["low_price"] = factor_price_level(prices) return factors def print_summary_table(results: list[dict], market: str) -> None: """Print a ranked summary of all factors.""" rows = [] for r in results: rows.append({ "Factor": r["name"], "IC_1d": r.get("ic_1d", np.nan), "ICIR_1d": r.get("icir_1d", np.nan), "IC_5d": r.get("ic_5d", np.nan), "ICIR_5d": r.get("icir_5d", np.nan), "IC_20d": r.get("ic_20d", np.nan), "ICIR_20d": r.get("icir_20d", np.nan), "LS_5d_ann": r.get("long_short_ann", np.nan), "Mono": r.get("monotonicity", np.nan), "Turnover": r.get("turnover", np.nan), "IC_stab": r.get("ic_stability", np.nan), "IC_worst_yr": r.get("ic_worst_year", np.nan), }) df = pd.DataFrame(rows).set_index("Factor") df = df.sort_values("ICIR_5d", ascending=False) print(f"\n{'='*100}") print(f" FACTOR RESEARCH RESULTS — {market.upper()} MARKET") print(f"{'='*100}") print("\nRanked by 5-day ICIR (most important metric for tradeable alpha):\n") print(df.round(4).to_string()) # Highlight top factors print(f"\n{'='*100}") print(" TOP FACTORS (ICIR_5d > 0.05 and IC_stability > 0.6)") print(f"{'='*100}") top = df[(df["ICIR_5d"].abs() > 0.05) & (df["IC_stab"] > 0.6)] if top.empty: top = df.head(5) print(" (No factor met strict threshold; showing top 5 by ICIR_5d)") print(top.round(4).to_string()) # Quintile details for top factors print(f"\n{'='*100}") print(" QUINTILE RETURN PROFILES (annualized, 5-day forward)") print(f"{'='*100}") for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:8]: qr = r.get("quintile_returns") if qr is not None and not qr.empty: q_str = " ".join(f"Q{int(k)}: {v:+.1%}" for k, v in qr.items()) ls = r.get("long_short_ann", 0) print(f" {r['name']:25s} | {q_str} | L/S: {ls:+.1%}") def main(): parser = argparse.ArgumentParser(description="Factor research") parser.add_argument("--market", default="us", choices=["us", "cn"]) parser.add_argument("--years", type=int, default=None, help="Limit to last N years") args = parser.parse_args() market = args.market config = UNIVERSES[market] benchmark = config["benchmark"] print(f"Loading {market.upper()} price data...") prices = data_manager.load(market) # Remove benchmark from stock universe stocks = prices.drop(columns=[benchmark], errors="ignore") if args.years: cutoff = stocks.index[-1] - pd.DateOffset(years=args.years) stocks = stocks[stocks.index >= cutoff] print(f"Universe: {stocks.shape[1]} stocks, {stocks.shape[0]} trading days") print(f"Date range: {stocks.index[0].date()} to {stocks.index[-1].date()}") # Build all factor signals print("\nComputing factor signals...") factors = get_all_factors(stocks, market) # Analyze each factor print("Running factor analysis (this may take a few minutes)...") results = [] for name, signal in factors.items(): print(f" Analyzing: {name}...") r = analyze_factor(name, signal, stocks) results.append(r) # Print results print_summary_table(results, market) # Factor correlation matrix print(f"\n{'='*100}") print(" FACTOR CORRELATION MATRIX (rank-averaged cross-sectional)") print(f"{'='*100}") corr = compute_factor_correlation(factors) # Show only top factors top_names = [r["name"] for r in sorted(results, key=lambda x: abs(x.get("icir_5d", 0)), reverse=True)[:10]] top_names_present = [n for n in top_names if n in corr.index] print(corr.loc[top_names_present, top_names_present].round(2).to_string()) if __name__ == "__main__": main()