From 47755ff63031e377fecf7debcff2901e863a4644 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Thu, 14 May 2026 12:52:55 +0800 Subject: [PATCH] feat: improve US alpha pipeline and regime filters Expand alpha pipeline with additional factors and scoring logic. Update regime filters and add comprehensive test coverage. --- research/regime_filters.py | 5 +- research/us_alpha_pipeline.py | 209 +++++++++++++++++++++++++++----- tests/test_us_alpha_pipeline.py | 112 +++++++++++++++++ 3 files changed, 295 insertions(+), 31 deletions(-) diff --git a/research/regime_filters.py b/research/regime_filters.py index 4d9e4c5..f8c4dd8 100644 --- a/research/regime_filters.py +++ b/research/regime_filters.py @@ -17,7 +17,10 @@ def build_regime_filter(etf_close: pd.DataFrame, market_col: str = "SPY") -> pd. rs = prices.pct_change(RS_WINDOW, fill_method=None) non_market_rs = rs.drop(columns=[market_col], errors="ignore") - leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1) + if non_market_rs.shape[1] == 0: + leader_ok = pd.Series(True, index=prices.index) + else: + leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1) regime = (market_ok & leader_ok).astype(bool) return regime.shift(1, fill_value=False) diff --git a/research/us_alpha_pipeline.py b/research/us_alpha_pipeline.py index c654928..6dd1ccd 100644 --- a/research/us_alpha_pipeline.py +++ b/research/us_alpha_pipeline.py @@ -6,6 +6,7 @@ import universe_history as uh from research.event_factors import breakout_after_compression_score from research.regime_filters import build_regime_filter from research.us_alpha_report import summarize_equity_window +from research.us_fundamentals import build_exploratory_fundamental_score from research.us_universe import build_tradable_mask @@ -51,6 +52,35 @@ def _build_equal_weight_portfolio( return raw.div(row_sums, axis=0).fillna(0.0) +def _build_close_only_tradable_mask(close: pd.DataFrame, pit_membership: pd.DataFrame | None) -> pd.DataFrame: + if pit_membership is None: + pit_mask = pd.DataFrame(True, index=close.index, columns=close.columns) + else: + pit_mask = pit_membership.reindex(index=close.index, columns=close.columns, fill_value=False) + pit_mask = pit_mask.where(pit_mask.notna(), False).astype(bool) + + eligible_close = close.where(pit_mask) + lagged_close = eligible_close.shift(1) + price_ok = lagged_close.gt(MIN_PRICE) + history_ok = ( + lagged_close.notna() + .rolling(window=MIN_HISTORY_DAYS, min_periods=MIN_HISTORY_DAYS) + .sum() + .ge(MIN_HISTORY_DAYS) + ) + return (price_ok & history_ok & pit_mask).astype(bool) + + +def _has_ohlcv_inputs(high: pd.DataFrame, low: pd.DataFrame, volume: pd.DataFrame) -> bool: + return not high.empty and not low.empty and not volume.empty and volume.notna().any().any() + + +def _blend_scores(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: + left_rank = left.rank(axis=1, pct=True, na_option="keep") + right_rank = right.rank(axis=1, pct=True, na_option="keep") + return (left_rank + right_rank) / 2.0 + + def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series: """Convert daily weights into a simple close-to-close equity curve.""" returns = close.pct_change(fill_method=None).fillna(0.0) @@ -62,6 +92,14 @@ def _read_panel_csv(path: str) -> pd.DataFrame: return pd.read_csv(path, index_col=0, parse_dates=True).sort_index() +def _read_nonempty_panel_csv(path: str) -> pd.DataFrame: + try: + panel = _read_panel_csv(path) + except FileNotFoundError: + return pd.DataFrame() + return panel if not panel.empty else pd.DataFrame() + + def load_saved_pit_market_data(data_dir: str = "data", prefix: str = "us_pit") -> dict[str, pd.DataFrame]: """Load saved PIT OHLCV panels from disk.""" panels = {} @@ -84,44 +122,105 @@ def load_saved_etf_close(data_dir: str = "data", market: str = "us_etf") -> pd.D data_manager.DATA_DIR = original_data_dir +def _strategy_scores( + close: pd.DataFrame, + high: pd.DataFrame, + low: pd.DataFrame, + volume: pd.DataFrame, + fundamental_score: pd.DataFrame | None = None, +) -> dict[str, pd.DataFrame]: + strategy_scores = {"rank_blend_regime": _price_rank_blend_score(close)} + if _has_ohlcv_inputs(high, low, volume): + strategy_scores["breakout_regime"] = breakout_after_compression_score(close, high, low, volume) + if fundamental_score is not None: + aligned_fundamental = fundamental_score.reindex(index=close.index, columns=close.columns) + strategy_scores["fundamental_regime"] = aligned_fundamental + if "breakout_regime" in strategy_scores: + strategy_scores["breakout_fundamental_regime"] = _blend_scores( + strategy_scores["breakout_regime"], aligned_fundamental + ) + strategy_scores["rank_blend_fundamental_regime"] = _blend_scores( + strategy_scores["rank_blend_regime"], aligned_fundamental + ) + return strategy_scores + + +def build_alpha_equity_curves( + market_data, + etf_close, + pit_membership=None, + top_n=10, + fundamental_score: pd.DataFrame | None = None, +) -> dict[str, pd.Series]: + close = market_data["close"].sort_index() + high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index() + low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index() + volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index() + + if _has_ohlcv_inputs(high, low, volume): + tradable_mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=pit_membership, + min_price=MIN_PRICE, + min_dollar_volume=MIN_DOLLAR_VOLUME, + min_history_days=MIN_HISTORY_DAYS, + min_valid_volume_days=MIN_VALID_VOLUME_DAYS, + liquidity_window=LIQUIDITY_WINDOW, + ) + else: + tradable_mask = _build_close_only_tradable_mask(close, pit_membership) + regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False) + + equities = {} + for strategy_name, score in _strategy_scores(close, high, low, volume, fundamental_score).items(): + weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n) + equities[strategy_name] = _equity_curve(close, weights) + return equities + + +def summarize_equity_curves(equity_curves: dict[str, pd.Series], windows=(1, 2, 3, 5, 10)) -> pd.DataFrame: + summary_rows = [] + for strategy_name, equity in equity_curves.items(): + for window_years in windows: + summary_rows.append(summarize_equity_window(equity, strategy_name, window_years)) + return pd.DataFrame(summary_rows) + + +def summarize_yearly_returns(equity_curves: dict[str, pd.Series], years: list[int]) -> pd.DataFrame: + eq_df = pd.DataFrame(equity_curves).sort_index() + rows = [] + for year in years: + window = eq_df.loc[(eq_df.index >= pd.Timestamp(year=year, month=1, day=1)) & (eq_df.index <= pd.Timestamp(year=year, month=12, day=31))] + if window.empty: + continue + row = {"Year": year} + for name in eq_df.columns: + series = window[name].dropna() + row[name] = np.nan if len(series) < 2 else (series.iloc[-1] / series.iloc[0] - 1.0) + rows.append(row) + if not rows: + return pd.DataFrame() + return pd.DataFrame(rows).set_index("Year") + + def run_alpha_pipeline( market_data, etf_close, pit_membership=None, windows=(1, 2, 3, 5, 10), top_n=10, + fundamental_score: pd.DataFrame | None = None, ) -> pd.DataFrame: """Run a lightweight strict US alpha pipeline and summarize trailing windows.""" - close = market_data["close"].sort_index() - high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index() - low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index() - volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index() - - tradable_mask = build_tradable_mask( - close=close, - volume=volume, + equity_curves = build_alpha_equity_curves( + market_data=market_data, + etf_close=etf_close, pit_membership=pit_membership, - min_price=MIN_PRICE, - min_dollar_volume=MIN_DOLLAR_VOLUME, - min_history_days=MIN_HISTORY_DAYS, - min_valid_volume_days=MIN_VALID_VOLUME_DAYS, - liquidity_window=LIQUIDITY_WINDOW, + top_n=top_n, + fundamental_score=fundamental_score, ) - regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False) - - strategy_scores = { - "breakout_regime": breakout_after_compression_score(close, high, low, volume), - "rank_blend_regime": _price_rank_blend_score(close), - } - - summary_rows = [] - for strategy_name, score in strategy_scores.items(): - weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n) - equity = _equity_curve(close, weights) - for window_years in windows: - summary_rows.append(summarize_equity_window(equity, strategy_name, window_years)) - - return pd.DataFrame(summary_rows) + return summarize_equity_curves(equity_curves, windows=windows) def run_saved_pit_alpha_pipeline( @@ -147,9 +246,59 @@ def run_saved_pit_alpha_pipeline( ) +def run_exploratory_fundamental_alpha_pipeline( + data_dir: str = "data", + market: str = "us_alpha_exploratory", + windows=(1, 2, 3, 5, 10), + top_n: int = 10, +) -> tuple[pd.DataFrame, pd.DataFrame]: + cached_close = _read_panel_csv(f"{data_dir}/us.csv") + stock_tickers = [ticker for ticker in cached_close.columns if ticker not in ETF_TICKERS] + saved_close = _read_nonempty_panel_csv(f"{data_dir}/{market}.csv") + saved_high = _read_nonempty_panel_csv(f"{data_dir}/{market}_high.csv") + saved_low = _read_nonempty_panel_csv(f"{data_dir}/{market}_low.csv") + saved_volume = _read_nonempty_panel_csv(f"{data_dir}/{market}_volume.csv") + saved_etf = _read_nonempty_panel_csv(f"{data_dir}/us_etf.csv") + + if not saved_close.empty and not saved_high.empty and not saved_low.empty and not saved_volume.empty: + market_data = { + "close": saved_close.reindex(columns=stock_tickers), + "high": saved_high.reindex(columns=stock_tickers), + "low": saved_low.reindex(columns=stock_tickers), + "volume": saved_volume.reindex(columns=stock_tickers), + } + else: + close = cached_close.reindex(columns=stock_tickers) + market_data = { + "close": close, + "high": pd.DataFrame(index=close.index, columns=close.columns, dtype=float), + "low": pd.DataFrame(index=close.index, columns=close.columns, dtype=float), + "volume": pd.DataFrame(index=close.index, columns=close.columns, dtype=float), + } + etf_close = saved_etf if not saved_etf.empty and "SPY" in saved_etf.columns else cached_close.reindex(columns=["SPY"]).dropna(how="all") + + fundamental_score = build_exploratory_fundamental_score(market_data["close"], data_dir=data_dir) + equity_curves = build_alpha_equity_curves( + market_data=market_data, + etf_close=etf_close, + pit_membership=None, + top_n=top_n, + fundamental_score=fundamental_score, + ) + windows_df = summarize_equity_curves(equity_curves, windows=windows) + years = list(range(int(market_data["close"].index.min().year), int(market_data["close"].index.max().year) + 1)) + yearly_df = summarize_yearly_returns(equity_curves, years) + windows_df.to_csv(f"{data_dir}/us_alpha_fundamental_windows.csv", index=False) + yearly_df.to_csv(f"{data_dir}/us_alpha_fundamental_10y_yearly.csv") + return windows_df, yearly_df + + def main() -> None: - summary = run_saved_pit_alpha_pipeline() - print(summary.to_string(index=False)) + windows_df, yearly_df = run_exploratory_fundamental_alpha_pipeline() + print("=== Window Summary ===") + print(windows_df.to_string(index=False)) + print("\n=== Yearly Returns ===") + print((yearly_df * 100.0).round(2).to_string()) if __name__ == "__main__": diff --git a/tests/test_us_alpha_pipeline.py b/tests/test_us_alpha_pipeline.py index 754114f..0d8418c 100644 --- a/tests/test_us_alpha_pipeline.py +++ b/tests/test_us_alpha_pipeline.py @@ -124,6 +124,118 @@ class USAlphaPipelineTests(unittest.TestCase): self.assertEqual(len(summary), 2) self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all()) + def test_run_alpha_pipeline_includes_fundamental_variants_when_score_is_supplied(self): + from research.us_alpha_pipeline import run_alpha_pipeline + + dates = pd.date_range("2023-01-01", periods=400, freq="D") + close = pd.DataFrame( + { + "AAA": [50.0 + 0.20 * i for i in range(400)], + "BBB": [55.0 + 0.12 * i for i in range(400)], + "CCC": [60.0 + 0.05 * i for i in range(400)], + }, + index=dates, + ) + high = close + 1.0 + low = close - 1.0 + volume = pd.DataFrame( + { + "AAA": [1_500_000.0] * 400, + "BBB": [1_400_000.0] * 400, + "CCC": [1_300_000.0] * 400, + }, + index=dates, + ) + etf_close = pd.DataFrame( + { + "SPY": [300.0 + 0.8 * i for i in range(400)], + "QQQ": [280.0 + 1.1 * i for i in range(400)], + "XLF": [200.0 + 0.4 * i for i in range(400)], + }, + index=dates, + ) + market_data = { + "close": close, + "high": high, + "low": low, + "volume": volume, + } + fundamental_score = pd.DataFrame( + { + "AAA": [0.9] * 400, + "BBB": [0.6] * 400, + "CCC": [0.3] * 400, + }, + index=dates, + ) + + summary = run_alpha_pipeline( + market_data=market_data, + etf_close=etf_close, + pit_membership=None, + windows=(1,), + top_n=2, + fundamental_score=fundamental_score, + ) + + self.assertEqual( + set(summary["strategy"]), + { + "breakout_regime", + "rank_blend_regime", + "fundamental_regime", + "breakout_fundamental_regime", + "rank_blend_fundamental_regime", + }, + ) + + def test_run_alpha_pipeline_close_only_fallback_skips_breakout_and_uses_spy_regime(self): + from research.us_alpha_pipeline import run_alpha_pipeline + + dates = pd.date_range("2023-01-01", periods=400, freq="D") + close = pd.DataFrame( + { + "AAA": [50.0 + 0.20 * i for i in range(400)], + "BBB": [55.0 + 0.12 * i for i in range(400)], + "CCC": [60.0 + 0.05 * i for i in range(400)], + }, + index=dates, + ) + market_data = { + "close": close, + "high": pd.DataFrame(index=dates, columns=close.columns), + "low": pd.DataFrame(index=dates, columns=close.columns), + "volume": pd.DataFrame(index=dates, columns=close.columns), + } + etf_close = pd.DataFrame({"SPY": [300.0 + 0.8 * i for i in range(400)]}, index=dates) + fundamental_score = pd.DataFrame( + { + "AAA": [0.9] * 400, + "BBB": [0.6] * 400, + "CCC": [0.3] * 400, + }, + index=dates, + ) + + summary = run_alpha_pipeline( + market_data=market_data, + etf_close=etf_close, + pit_membership=None, + windows=(1,), + top_n=2, + fundamental_score=fundamental_score, + ) + + self.assertEqual( + set(summary["strategy"]), + { + "rank_blend_regime", + "fundamental_regime", + "rank_blend_fundamental_regime", + }, + ) + self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all()) + def test_run_saved_pit_alpha_pipeline_reads_saved_inputs(self): from research.us_alpha_pipeline import run_saved_pit_alpha_pipeline