feat: improve US alpha pipeline and regime filters
Expand alpha pipeline with additional factors and scoring logic. Update regime filters and add comprehensive test coverage.
This commit is contained in:
@@ -17,6 +17,9 @@ def build_regime_filter(etf_close: pd.DataFrame, market_col: str = "SPY") -> pd.
|
||||
|
||||
rs = prices.pct_change(RS_WINDOW, fill_method=None)
|
||||
non_market_rs = rs.drop(columns=[market_col], errors="ignore")
|
||||
if non_market_rs.shape[1] == 0:
|
||||
leader_ok = pd.Series(True, index=prices.index)
|
||||
else:
|
||||
leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1)
|
||||
|
||||
regime = (market_ok & leader_ok).astype(bool)
|
||||
|
||||
@@ -6,6 +6,7 @@ import universe_history as uh
|
||||
from research.event_factors import breakout_after_compression_score
|
||||
from research.regime_filters import build_regime_filter
|
||||
from research.us_alpha_report import summarize_equity_window
|
||||
from research.us_fundamentals import build_exploratory_fundamental_score
|
||||
from research.us_universe import build_tradable_mask
|
||||
|
||||
|
||||
@@ -51,6 +52,35 @@ def _build_equal_weight_portfolio(
|
||||
return raw.div(row_sums, axis=0).fillna(0.0)
|
||||
|
||||
|
||||
def _build_close_only_tradable_mask(close: pd.DataFrame, pit_membership: pd.DataFrame | None) -> pd.DataFrame:
|
||||
if pit_membership is None:
|
||||
pit_mask = pd.DataFrame(True, index=close.index, columns=close.columns)
|
||||
else:
|
||||
pit_mask = pit_membership.reindex(index=close.index, columns=close.columns, fill_value=False)
|
||||
pit_mask = pit_mask.where(pit_mask.notna(), False).astype(bool)
|
||||
|
||||
eligible_close = close.where(pit_mask)
|
||||
lagged_close = eligible_close.shift(1)
|
||||
price_ok = lagged_close.gt(MIN_PRICE)
|
||||
history_ok = (
|
||||
lagged_close.notna()
|
||||
.rolling(window=MIN_HISTORY_DAYS, min_periods=MIN_HISTORY_DAYS)
|
||||
.sum()
|
||||
.ge(MIN_HISTORY_DAYS)
|
||||
)
|
||||
return (price_ok & history_ok & pit_mask).astype(bool)
|
||||
|
||||
|
||||
def _has_ohlcv_inputs(high: pd.DataFrame, low: pd.DataFrame, volume: pd.DataFrame) -> bool:
|
||||
return not high.empty and not low.empty and not volume.empty and volume.notna().any().any()
|
||||
|
||||
|
||||
def _blend_scores(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
|
||||
left_rank = left.rank(axis=1, pct=True, na_option="keep")
|
||||
right_rank = right.rank(axis=1, pct=True, na_option="keep")
|
||||
return (left_rank + right_rank) / 2.0
|
||||
|
||||
|
||||
def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series:
|
||||
"""Convert daily weights into a simple close-to-close equity curve."""
|
||||
returns = close.pct_change(fill_method=None).fillna(0.0)
|
||||
@@ -62,6 +92,14 @@ def _read_panel_csv(path: str) -> pd.DataFrame:
|
||||
return pd.read_csv(path, index_col=0, parse_dates=True).sort_index()
|
||||
|
||||
|
||||
def _read_nonempty_panel_csv(path: str) -> pd.DataFrame:
|
||||
try:
|
||||
panel = _read_panel_csv(path)
|
||||
except FileNotFoundError:
|
||||
return pd.DataFrame()
|
||||
return panel if not panel.empty else pd.DataFrame()
|
||||
|
||||
|
||||
def load_saved_pit_market_data(data_dir: str = "data", prefix: str = "us_pit") -> dict[str, pd.DataFrame]:
|
||||
"""Load saved PIT OHLCV panels from disk."""
|
||||
panels = {}
|
||||
@@ -84,19 +122,42 @@ def load_saved_etf_close(data_dir: str = "data", market: str = "us_etf") -> pd.D
|
||||
data_manager.DATA_DIR = original_data_dir
|
||||
|
||||
|
||||
def run_alpha_pipeline(
|
||||
def _strategy_scores(
|
||||
close: pd.DataFrame,
|
||||
high: pd.DataFrame,
|
||||
low: pd.DataFrame,
|
||||
volume: pd.DataFrame,
|
||||
fundamental_score: pd.DataFrame | None = None,
|
||||
) -> dict[str, pd.DataFrame]:
|
||||
strategy_scores = {"rank_blend_regime": _price_rank_blend_score(close)}
|
||||
if _has_ohlcv_inputs(high, low, volume):
|
||||
strategy_scores["breakout_regime"] = breakout_after_compression_score(close, high, low, volume)
|
||||
if fundamental_score is not None:
|
||||
aligned_fundamental = fundamental_score.reindex(index=close.index, columns=close.columns)
|
||||
strategy_scores["fundamental_regime"] = aligned_fundamental
|
||||
if "breakout_regime" in strategy_scores:
|
||||
strategy_scores["breakout_fundamental_regime"] = _blend_scores(
|
||||
strategy_scores["breakout_regime"], aligned_fundamental
|
||||
)
|
||||
strategy_scores["rank_blend_fundamental_regime"] = _blend_scores(
|
||||
strategy_scores["rank_blend_regime"], aligned_fundamental
|
||||
)
|
||||
return strategy_scores
|
||||
|
||||
|
||||
def build_alpha_equity_curves(
|
||||
market_data,
|
||||
etf_close,
|
||||
pit_membership=None,
|
||||
windows=(1, 2, 3, 5, 10),
|
||||
top_n=10,
|
||||
) -> pd.DataFrame:
|
||||
"""Run a lightweight strict US alpha pipeline and summarize trailing windows."""
|
||||
fundamental_score: pd.DataFrame | None = None,
|
||||
) -> dict[str, pd.Series]:
|
||||
close = market_data["close"].sort_index()
|
||||
high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index()
|
||||
low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index()
|
||||
volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index()
|
||||
|
||||
if _has_ohlcv_inputs(high, low, volume):
|
||||
tradable_mask = build_tradable_mask(
|
||||
close=close,
|
||||
volume=volume,
|
||||
@@ -107,23 +168,61 @@ def run_alpha_pipeline(
|
||||
min_valid_volume_days=MIN_VALID_VOLUME_DAYS,
|
||||
liquidity_window=LIQUIDITY_WINDOW,
|
||||
)
|
||||
else:
|
||||
tradable_mask = _build_close_only_tradable_mask(close, pit_membership)
|
||||
regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False)
|
||||
|
||||
strategy_scores = {
|
||||
"breakout_regime": breakout_after_compression_score(close, high, low, volume),
|
||||
"rank_blend_regime": _price_rank_blend_score(close),
|
||||
}
|
||||
|
||||
summary_rows = []
|
||||
for strategy_name, score in strategy_scores.items():
|
||||
equities = {}
|
||||
for strategy_name, score in _strategy_scores(close, high, low, volume, fundamental_score).items():
|
||||
weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n)
|
||||
equity = _equity_curve(close, weights)
|
||||
equities[strategy_name] = _equity_curve(close, weights)
|
||||
return equities
|
||||
|
||||
|
||||
def summarize_equity_curves(equity_curves: dict[str, pd.Series], windows=(1, 2, 3, 5, 10)) -> pd.DataFrame:
|
||||
summary_rows = []
|
||||
for strategy_name, equity in equity_curves.items():
|
||||
for window_years in windows:
|
||||
summary_rows.append(summarize_equity_window(equity, strategy_name, window_years))
|
||||
|
||||
return pd.DataFrame(summary_rows)
|
||||
|
||||
|
||||
def summarize_yearly_returns(equity_curves: dict[str, pd.Series], years: list[int]) -> pd.DataFrame:
|
||||
eq_df = pd.DataFrame(equity_curves).sort_index()
|
||||
rows = []
|
||||
for year in years:
|
||||
window = eq_df.loc[(eq_df.index >= pd.Timestamp(year=year, month=1, day=1)) & (eq_df.index <= pd.Timestamp(year=year, month=12, day=31))]
|
||||
if window.empty:
|
||||
continue
|
||||
row = {"Year": year}
|
||||
for name in eq_df.columns:
|
||||
series = window[name].dropna()
|
||||
row[name] = np.nan if len(series) < 2 else (series.iloc[-1] / series.iloc[0] - 1.0)
|
||||
rows.append(row)
|
||||
if not rows:
|
||||
return pd.DataFrame()
|
||||
return pd.DataFrame(rows).set_index("Year")
|
||||
|
||||
|
||||
def run_alpha_pipeline(
|
||||
market_data,
|
||||
etf_close,
|
||||
pit_membership=None,
|
||||
windows=(1, 2, 3, 5, 10),
|
||||
top_n=10,
|
||||
fundamental_score: pd.DataFrame | None = None,
|
||||
) -> pd.DataFrame:
|
||||
"""Run a lightweight strict US alpha pipeline and summarize trailing windows."""
|
||||
equity_curves = build_alpha_equity_curves(
|
||||
market_data=market_data,
|
||||
etf_close=etf_close,
|
||||
pit_membership=pit_membership,
|
||||
top_n=top_n,
|
||||
fundamental_score=fundamental_score,
|
||||
)
|
||||
return summarize_equity_curves(equity_curves, windows=windows)
|
||||
|
||||
|
||||
def run_saved_pit_alpha_pipeline(
|
||||
data_dir: str = "data",
|
||||
windows=(1, 2, 3, 5, 10),
|
||||
@@ -147,9 +246,59 @@ def run_saved_pit_alpha_pipeline(
|
||||
)
|
||||
|
||||
|
||||
def run_exploratory_fundamental_alpha_pipeline(
|
||||
data_dir: str = "data",
|
||||
market: str = "us_alpha_exploratory",
|
||||
windows=(1, 2, 3, 5, 10),
|
||||
top_n: int = 10,
|
||||
) -> tuple[pd.DataFrame, pd.DataFrame]:
|
||||
cached_close = _read_panel_csv(f"{data_dir}/us.csv")
|
||||
stock_tickers = [ticker for ticker in cached_close.columns if ticker not in ETF_TICKERS]
|
||||
saved_close = _read_nonempty_panel_csv(f"{data_dir}/{market}.csv")
|
||||
saved_high = _read_nonempty_panel_csv(f"{data_dir}/{market}_high.csv")
|
||||
saved_low = _read_nonempty_panel_csv(f"{data_dir}/{market}_low.csv")
|
||||
saved_volume = _read_nonempty_panel_csv(f"{data_dir}/{market}_volume.csv")
|
||||
saved_etf = _read_nonempty_panel_csv(f"{data_dir}/us_etf.csv")
|
||||
|
||||
if not saved_close.empty and not saved_high.empty and not saved_low.empty and not saved_volume.empty:
|
||||
market_data = {
|
||||
"close": saved_close.reindex(columns=stock_tickers),
|
||||
"high": saved_high.reindex(columns=stock_tickers),
|
||||
"low": saved_low.reindex(columns=stock_tickers),
|
||||
"volume": saved_volume.reindex(columns=stock_tickers),
|
||||
}
|
||||
else:
|
||||
close = cached_close.reindex(columns=stock_tickers)
|
||||
market_data = {
|
||||
"close": close,
|
||||
"high": pd.DataFrame(index=close.index, columns=close.columns, dtype=float),
|
||||
"low": pd.DataFrame(index=close.index, columns=close.columns, dtype=float),
|
||||
"volume": pd.DataFrame(index=close.index, columns=close.columns, dtype=float),
|
||||
}
|
||||
etf_close = saved_etf if not saved_etf.empty and "SPY" in saved_etf.columns else cached_close.reindex(columns=["SPY"]).dropna(how="all")
|
||||
|
||||
fundamental_score = build_exploratory_fundamental_score(market_data["close"], data_dir=data_dir)
|
||||
equity_curves = build_alpha_equity_curves(
|
||||
market_data=market_data,
|
||||
etf_close=etf_close,
|
||||
pit_membership=None,
|
||||
top_n=top_n,
|
||||
fundamental_score=fundamental_score,
|
||||
)
|
||||
windows_df = summarize_equity_curves(equity_curves, windows=windows)
|
||||
years = list(range(int(market_data["close"].index.min().year), int(market_data["close"].index.max().year) + 1))
|
||||
yearly_df = summarize_yearly_returns(equity_curves, years)
|
||||
windows_df.to_csv(f"{data_dir}/us_alpha_fundamental_windows.csv", index=False)
|
||||
yearly_df.to_csv(f"{data_dir}/us_alpha_fundamental_10y_yearly.csv")
|
||||
return windows_df, yearly_df
|
||||
|
||||
|
||||
def main() -> None:
|
||||
summary = run_saved_pit_alpha_pipeline()
|
||||
print(summary.to_string(index=False))
|
||||
windows_df, yearly_df = run_exploratory_fundamental_alpha_pipeline()
|
||||
print("=== Window Summary ===")
|
||||
print(windows_df.to_string(index=False))
|
||||
print("\n=== Yearly Returns ===")
|
||||
print((yearly_df * 100.0).round(2).to_string())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -124,6 +124,118 @@ class USAlphaPipelineTests(unittest.TestCase):
|
||||
self.assertEqual(len(summary), 2)
|
||||
self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all())
|
||||
|
||||
def test_run_alpha_pipeline_includes_fundamental_variants_when_score_is_supplied(self):
|
||||
from research.us_alpha_pipeline import run_alpha_pipeline
|
||||
|
||||
dates = pd.date_range("2023-01-01", periods=400, freq="D")
|
||||
close = pd.DataFrame(
|
||||
{
|
||||
"AAA": [50.0 + 0.20 * i for i in range(400)],
|
||||
"BBB": [55.0 + 0.12 * i for i in range(400)],
|
||||
"CCC": [60.0 + 0.05 * i for i in range(400)],
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
high = close + 1.0
|
||||
low = close - 1.0
|
||||
volume = pd.DataFrame(
|
||||
{
|
||||
"AAA": [1_500_000.0] * 400,
|
||||
"BBB": [1_400_000.0] * 400,
|
||||
"CCC": [1_300_000.0] * 400,
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
etf_close = pd.DataFrame(
|
||||
{
|
||||
"SPY": [300.0 + 0.8 * i for i in range(400)],
|
||||
"QQQ": [280.0 + 1.1 * i for i in range(400)],
|
||||
"XLF": [200.0 + 0.4 * i for i in range(400)],
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
market_data = {
|
||||
"close": close,
|
||||
"high": high,
|
||||
"low": low,
|
||||
"volume": volume,
|
||||
}
|
||||
fundamental_score = pd.DataFrame(
|
||||
{
|
||||
"AAA": [0.9] * 400,
|
||||
"BBB": [0.6] * 400,
|
||||
"CCC": [0.3] * 400,
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
|
||||
summary = run_alpha_pipeline(
|
||||
market_data=market_data,
|
||||
etf_close=etf_close,
|
||||
pit_membership=None,
|
||||
windows=(1,),
|
||||
top_n=2,
|
||||
fundamental_score=fundamental_score,
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
set(summary["strategy"]),
|
||||
{
|
||||
"breakout_regime",
|
||||
"rank_blend_regime",
|
||||
"fundamental_regime",
|
||||
"breakout_fundamental_regime",
|
||||
"rank_blend_fundamental_regime",
|
||||
},
|
||||
)
|
||||
|
||||
def test_run_alpha_pipeline_close_only_fallback_skips_breakout_and_uses_spy_regime(self):
|
||||
from research.us_alpha_pipeline import run_alpha_pipeline
|
||||
|
||||
dates = pd.date_range("2023-01-01", periods=400, freq="D")
|
||||
close = pd.DataFrame(
|
||||
{
|
||||
"AAA": [50.0 + 0.20 * i for i in range(400)],
|
||||
"BBB": [55.0 + 0.12 * i for i in range(400)],
|
||||
"CCC": [60.0 + 0.05 * i for i in range(400)],
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
market_data = {
|
||||
"close": close,
|
||||
"high": pd.DataFrame(index=dates, columns=close.columns),
|
||||
"low": pd.DataFrame(index=dates, columns=close.columns),
|
||||
"volume": pd.DataFrame(index=dates, columns=close.columns),
|
||||
}
|
||||
etf_close = pd.DataFrame({"SPY": [300.0 + 0.8 * i for i in range(400)]}, index=dates)
|
||||
fundamental_score = pd.DataFrame(
|
||||
{
|
||||
"AAA": [0.9] * 400,
|
||||
"BBB": [0.6] * 400,
|
||||
"CCC": [0.3] * 400,
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
|
||||
summary = run_alpha_pipeline(
|
||||
market_data=market_data,
|
||||
etf_close=etf_close,
|
||||
pit_membership=None,
|
||||
windows=(1,),
|
||||
top_n=2,
|
||||
fundamental_score=fundamental_score,
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
set(summary["strategy"]),
|
||||
{
|
||||
"rank_blend_regime",
|
||||
"fundamental_regime",
|
||||
"rank_blend_fundamental_regime",
|
||||
},
|
||||
)
|
||||
self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all())
|
||||
|
||||
def test_run_saved_pit_alpha_pipeline_reads_saved_inputs(self):
|
||||
from research.us_alpha_pipeline import run_saved_pit_alpha_pipeline
|
||||
|
||||
|
||||
Reference in New Issue
Block a user