feat: add strict US alpha research pipeline

This commit is contained in:
2026-04-18 00:38:29 +08:00
parent bf6fccfd11
commit c015873ee1
3 changed files with 245 additions and 0 deletions

View File

@@ -0,0 +1,95 @@
import numpy as np
import pandas as pd
from research.event_factors import breakout_after_compression_score
from research.regime_filters import build_regime_filter
from research.us_alpha_report import summarize_equity_window
from research.us_universe import build_tradable_mask
MIN_PRICE = 5.0
MIN_DOLLAR_VOLUME = 20_000_000.0
MIN_HISTORY_DAYS = 252
MIN_VALID_VOLUME_DAYS = 40
LIQUIDITY_WINDOW = 60
TREND_WINDOW = 126
RECOVERY_WINDOW = 63
HIGH_PROX_WINDOW = 126
def _price_rank_blend_score(close: pd.DataFrame) -> pd.DataFrame:
"""Simple price-only cross-sectional blend, shifted for next-day trading."""
trend = close.pct_change(TREND_WINDOW, fill_method=None)
recovery = close / close.rolling(RECOVERY_WINDOW, min_periods=RECOVERY_WINDOW).min() - 1
high_proximity = close / close.rolling(HIGH_PROX_WINDOW, min_periods=HIGH_PROX_WINDOW).max().replace(0, np.nan)
trend_rank = trend.rank(axis=1, pct=True, na_option="keep")
recovery_rank = recovery.rank(axis=1, pct=True, na_option="keep")
high_rank = high_proximity.rank(axis=1, pct=True, na_option="keep")
return ((trend_rank + recovery_rank + high_rank) / 3.0).shift(1)
def _build_equal_weight_portfolio(
score: pd.DataFrame,
tradable_mask: pd.DataFrame,
regime_filter: pd.Series,
top_n: int,
) -> pd.DataFrame:
"""Build equal-weight top-n long-only weights from aligned scores."""
aligned_score = score.reindex(index=tradable_mask.index, columns=tradable_mask.columns)
eligible_score = aligned_score.where(tradable_mask)
rank = eligible_score.rank(axis=1, ascending=False, na_option="bottom", method="first")
selected = (rank <= top_n) & eligible_score.notna()
selected = selected & regime_filter.reindex(tradable_mask.index, fill_value=False).to_numpy().reshape(-1, 1)
raw = selected.astype(float)
row_sums = raw.sum(axis=1).replace(0.0, np.nan)
return raw.div(row_sums, axis=0).fillna(0.0)
def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series:
"""Convert daily weights into a simple close-to-close equity curve."""
returns = close.pct_change(fill_method=None).fillna(0.0)
portfolio_returns = (returns * weights.shift(1).fillna(0.0)).sum(axis=1)
return (1.0 + portfolio_returns).cumprod()
def run_alpha_pipeline(
market_data,
etf_close,
pit_membership=None,
windows=(1, 2, 3, 5, 10),
top_n=10,
) -> pd.DataFrame:
"""Run a lightweight strict US alpha pipeline and summarize trailing windows."""
close = market_data["close"].sort_index()
high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index()
low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index()
volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index()
tradable_mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=pit_membership,
min_price=MIN_PRICE,
min_dollar_volume=MIN_DOLLAR_VOLUME,
min_history_days=MIN_HISTORY_DAYS,
min_valid_volume_days=MIN_VALID_VOLUME_DAYS,
liquidity_window=LIQUIDITY_WINDOW,
)
regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False)
strategy_scores = {
"breakout_regime": breakout_after_compression_score(close, high, low, volume),
"rank_blend_regime": _price_rank_blend_score(close),
}
summary_rows = []
for strategy_name, score in strategy_scores.items():
weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n)
equity = _equity_curve(close, weights)
for window_years in windows:
summary_rows.append(summarize_equity_window(equity, strategy_name, window_years))
return pd.DataFrame(summary_rows)

View File

@@ -0,0 +1,36 @@
import numpy as np
import pandas as pd
TRADING_DAYS_PER_YEAR = 252
def summarize_equity_window(equity: pd.Series, strategy: str, window_years: int | float) -> dict:
"""Summarize a strategy equity curve over a trailing trading-day window."""
window_days = max(int(window_years * TRADING_DAYS_PER_YEAR), 1)
window_equity = equity.tail(window_days + 1).dropna()
if len(window_equity) < 2:
return {
"strategy": strategy,
"window_years": window_years,
"CAGR": np.nan,
"Sharpe": np.nan,
"MaxDD": np.nan,
"TotalRet": np.nan,
}
daily = window_equity.pct_change(fill_method=None).dropna()
total_ret = window_equity.iloc[-1] / window_equity.iloc[0] - 1
years = len(daily) / TRADING_DAYS_PER_YEAR
cagr = (window_equity.iloc[-1] / window_equity.iloc[0]) ** (1 / years) - 1 if years > 0 else np.nan
vol = daily.std() * np.sqrt(TRADING_DAYS_PER_YEAR)
sharpe = (daily.mean() * TRADING_DAYS_PER_YEAR) / vol if vol > 0 else 0.0
max_dd = (window_equity / window_equity.cummax() - 1).min()
return {
"strategy": strategy,
"window_years": window_years,
"CAGR": cagr,
"Sharpe": sharpe,
"MaxDD": max_dd,
"TotalRet": total_ret,
}