diff --git a/research/us_alpha_pipeline.py b/research/us_alpha_pipeline.py new file mode 100644 index 0000000..ab4ef69 --- /dev/null +++ b/research/us_alpha_pipeline.py @@ -0,0 +1,95 @@ +import numpy as np +import pandas as pd + +from research.event_factors import breakout_after_compression_score +from research.regime_filters import build_regime_filter +from research.us_alpha_report import summarize_equity_window +from research.us_universe import build_tradable_mask + + +MIN_PRICE = 5.0 +MIN_DOLLAR_VOLUME = 20_000_000.0 +MIN_HISTORY_DAYS = 252 +MIN_VALID_VOLUME_DAYS = 40 +LIQUIDITY_WINDOW = 60 + +TREND_WINDOW = 126 +RECOVERY_WINDOW = 63 +HIGH_PROX_WINDOW = 126 + + +def _price_rank_blend_score(close: pd.DataFrame) -> pd.DataFrame: + """Simple price-only cross-sectional blend, shifted for next-day trading.""" + trend = close.pct_change(TREND_WINDOW, fill_method=None) + recovery = close / close.rolling(RECOVERY_WINDOW, min_periods=RECOVERY_WINDOW).min() - 1 + high_proximity = close / close.rolling(HIGH_PROX_WINDOW, min_periods=HIGH_PROX_WINDOW).max().replace(0, np.nan) + + trend_rank = trend.rank(axis=1, pct=True, na_option="keep") + recovery_rank = recovery.rank(axis=1, pct=True, na_option="keep") + high_rank = high_proximity.rank(axis=1, pct=True, na_option="keep") + return ((trend_rank + recovery_rank + high_rank) / 3.0).shift(1) + + +def _build_equal_weight_portfolio( + score: pd.DataFrame, + tradable_mask: pd.DataFrame, + regime_filter: pd.Series, + top_n: int, +) -> pd.DataFrame: + """Build equal-weight top-n long-only weights from aligned scores.""" + aligned_score = score.reindex(index=tradable_mask.index, columns=tradable_mask.columns) + eligible_score = aligned_score.where(tradable_mask) + rank = eligible_score.rank(axis=1, ascending=False, na_option="bottom", method="first") + selected = (rank <= top_n) & eligible_score.notna() + selected = selected & regime_filter.reindex(tradable_mask.index, fill_value=False).to_numpy().reshape(-1, 1) + + raw = selected.astype(float) + row_sums = raw.sum(axis=1).replace(0.0, np.nan) + return raw.div(row_sums, axis=0).fillna(0.0) + + +def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series: + """Convert daily weights into a simple close-to-close equity curve.""" + returns = close.pct_change(fill_method=None).fillna(0.0) + portfolio_returns = (returns * weights.shift(1).fillna(0.0)).sum(axis=1) + return (1.0 + portfolio_returns).cumprod() + + +def run_alpha_pipeline( + market_data, + etf_close, + pit_membership=None, + windows=(1, 2, 3, 5, 10), + top_n=10, +) -> pd.DataFrame: + """Run a lightweight strict US alpha pipeline and summarize trailing windows.""" + close = market_data["close"].sort_index() + high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index() + low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index() + volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index() + + tradable_mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=pit_membership, + min_price=MIN_PRICE, + min_dollar_volume=MIN_DOLLAR_VOLUME, + min_history_days=MIN_HISTORY_DAYS, + min_valid_volume_days=MIN_VALID_VOLUME_DAYS, + liquidity_window=LIQUIDITY_WINDOW, + ) + regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False) + + strategy_scores = { + "breakout_regime": breakout_after_compression_score(close, high, low, volume), + "rank_blend_regime": _price_rank_blend_score(close), + } + + summary_rows = [] + for strategy_name, score in strategy_scores.items(): + weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n) + equity = _equity_curve(close, weights) + for window_years in windows: + summary_rows.append(summarize_equity_window(equity, strategy_name, window_years)) + + return pd.DataFrame(summary_rows) diff --git a/research/us_alpha_report.py b/research/us_alpha_report.py new file mode 100644 index 0000000..a973059 --- /dev/null +++ b/research/us_alpha_report.py @@ -0,0 +1,36 @@ +import numpy as np +import pandas as pd + + +TRADING_DAYS_PER_YEAR = 252 + + +def summarize_equity_window(equity: pd.Series, strategy: str, window_years: int | float) -> dict: + """Summarize a strategy equity curve over a trailing trading-day window.""" + window_days = max(int(window_years * TRADING_DAYS_PER_YEAR), 1) + window_equity = equity.tail(window_days + 1).dropna() + if len(window_equity) < 2: + return { + "strategy": strategy, + "window_years": window_years, + "CAGR": np.nan, + "Sharpe": np.nan, + "MaxDD": np.nan, + "TotalRet": np.nan, + } + + daily = window_equity.pct_change(fill_method=None).dropna() + total_ret = window_equity.iloc[-1] / window_equity.iloc[0] - 1 + years = len(daily) / TRADING_DAYS_PER_YEAR + cagr = (window_equity.iloc[-1] / window_equity.iloc[0]) ** (1 / years) - 1 if years > 0 else np.nan + vol = daily.std() * np.sqrt(TRADING_DAYS_PER_YEAR) + sharpe = (daily.mean() * TRADING_DAYS_PER_YEAR) / vol if vol > 0 else 0.0 + max_dd = (window_equity / window_equity.cummax() - 1).min() + return { + "strategy": strategy, + "window_years": window_years, + "CAGR": cagr, + "Sharpe": sharpe, + "MaxDD": max_dd, + "TotalRet": total_ret, + } diff --git a/tests/test_us_alpha_pipeline.py b/tests/test_us_alpha_pipeline.py new file mode 100644 index 0000000..969c069 --- /dev/null +++ b/tests/test_us_alpha_pipeline.py @@ -0,0 +1,114 @@ +import unittest + +import pandas as pd + + +class USAlphaPipelineTests(unittest.TestCase): + def test_build_equal_weight_portfolio_caps_holdings_under_ties(self): + from research.us_alpha_pipeline import _build_equal_weight_portfolio + + dates = pd.date_range("2024-01-01", periods=2, freq="D") + score = pd.DataFrame( + { + "AAA": [0.9, 0.9], + "BBB": [0.9, 0.9], + "CCC": [0.9, 0.9], + }, + index=dates, + ) + tradable_mask = pd.DataFrame(True, index=dates, columns=score.columns) + regime = pd.Series([True, True], index=dates) + + weights = _build_equal_weight_portfolio(score, tradable_mask, regime, top_n=2) + + self.assertEqual(int((weights.iloc[-1] > 0).sum()), 2) + self.assertAlmostEqual(float(weights.iloc[-1].sum()), 1.0) + + def test_equity_curve_uses_prior_day_weights_for_returns(self): + from research.us_alpha_pipeline import _equity_curve + + dates = pd.date_range("2024-01-01", periods=3, freq="D") + close = pd.DataFrame({"AAA": [1.0, 2.0, 4.0]}, index=dates) + weights = pd.DataFrame({"AAA": [0.0, 1.0, 0.0]}, index=dates) + + equity = _equity_curve(close, weights) + + self.assertEqual(float(equity.iloc[1]), 1.0) + self.assertEqual(float(equity.iloc[2]), 2.0) + + def test_run_alpha_pipeline_returns_expected_strategy_summary(self): + from research.us_alpha_pipeline import run_alpha_pipeline + + dates = pd.date_range("2023-01-01", periods=400, freq="D") + + aaa_close = [50.0 + 0.20 * i for i in range(400)] + bbb_close = [55.0 + 0.12 * i for i in range(400)] + ccc_close = [60.0 + 0.05 * i for i in range(400)] + close = pd.DataFrame( + { + "AAA": aaa_close, + "BBB": bbb_close, + "CCC": ccc_close, + }, + index=dates, + ) + high = pd.DataFrame( + { + "AAA": [value + 0.5 for value in aaa_close], + "BBB": [value + 1.0 for value in bbb_close], + "CCC": [value + 1.5 for value in ccc_close], + }, + index=dates, + ) + low = pd.DataFrame( + { + "AAA": [value - 0.5 for value in aaa_close], + "BBB": [value - 1.0 for value in bbb_close], + "CCC": [value - 1.5 for value in ccc_close], + }, + index=dates, + ) + volume = pd.DataFrame( + { + "AAA": [1_500_000.0] * 400, + "BBB": [1_400_000.0] * 400, + "CCC": [1_300_000.0] * 400, + }, + index=dates, + ) + volume.loc[dates[-2], "AAA"] = 4_000_000.0 + + etf_close = pd.DataFrame( + { + "SPY": [300.0 + 0.8 * i for i in range(400)], + "QQQ": [280.0 + 1.1 * i for i in range(400)], + "XLF": [200.0 + 0.4 * i for i in range(400)], + }, + index=dates, + ) + + market_data = { + "close": close, + "high": high, + "low": low, + "volume": volume, + } + + summary = run_alpha_pipeline( + market_data=market_data, + etf_close=etf_close, + pit_membership=None, + windows=(1,), + top_n=2, + ) + + required_columns = {"strategy", "window_years", "CAGR", "Sharpe", "MaxDD", "TotalRet"} + self.assertTrue(required_columns.issubset(summary.columns)) + self.assertEqual(set(summary["strategy"]), {"breakout_regime", "rank_blend_regime"}) + self.assertEqual(set(summary["window_years"]), {1}) + self.assertEqual(len(summary), 2) + self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all()) + + +if __name__ == "__main__": + unittest.main()