diff --git a/research/event_factors.py b/research/event_factors.py new file mode 100644 index 0000000..e1412bf --- /dev/null +++ b/research/event_factors.py @@ -0,0 +1,34 @@ +import numpy as np +import pandas as pd + + +TRAILING_HIGH_WINDOW = 60 +COMPRESSION_WINDOW = 20 +VOLUME_WINDOW = 20 + + +def breakout_after_compression_score( + close: pd.DataFrame, + high: pd.DataFrame, + low: pd.DataFrame, + volume: pd.DataFrame, +) -> pd.DataFrame: + """Score breakout setups and shift the result so it is tradable next day.""" + close = close.sort_index() + high = high.reindex(index=close.index, columns=close.columns).sort_index() + low = low.reindex(index=close.index, columns=close.columns).sort_index() + volume = volume.reindex(index=close.index, columns=close.columns).sort_index() + + trailing_high = close.rolling(TRAILING_HIGH_WINDOW, min_periods=TRAILING_HIGH_WINDOW).max() + proximity_to_high = close / trailing_high.replace(0, np.nan) + + recent_high = high.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).max() + recent_low = low.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).min() + recent_mid = (recent_high + recent_low) / 2 + compressed_range = -((recent_high - recent_low) / recent_mid.replace(0, np.nan)) + + median_volume = volume.rolling(VOLUME_WINDOW, min_periods=VOLUME_WINDOW).median() + abnormal_volume = volume / median_volume.replace(0, np.nan) + + score = proximity_to_high + compressed_range + abnormal_volume + return score.shift(1) diff --git a/research/regime_filters.py b/research/regime_filters.py new file mode 100644 index 0000000..4d9e4c5 --- /dev/null +++ b/research/regime_filters.py @@ -0,0 +1,23 @@ +import pandas as pd + + +LONG_MA_WINDOW = 200 +RS_WINDOW = 63 + + +def build_regime_filter(etf_close: pd.DataFrame, market_col: str = "SPY") -> pd.Series: + """Return a next-day tradable regime flag based on market trend and ETF leadership.""" + prices = etf_close.sort_index() + if market_col not in prices.columns: + raise KeyError(f"{market_col} not found in etf_close") + + market = prices[market_col] + market_ma = market.rolling(LONG_MA_WINDOW, min_periods=LONG_MA_WINDOW).mean() + market_ok = market.gt(market_ma) + + rs = prices.pct_change(RS_WINDOW, fill_method=None) + non_market_rs = rs.drop(columns=[market_col], errors="ignore") + leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1) + + regime = (market_ok & leader_ok).astype(bool) + return regime.shift(1, fill_value=False) diff --git a/tests/test_alpha_signals.py b/tests/test_alpha_signals.py new file mode 100644 index 0000000..c2f4356 --- /dev/null +++ b/tests/test_alpha_signals.py @@ -0,0 +1,118 @@ +import unittest +import warnings + +import numpy as np +import pandas as pd + + +class AlphaSignalTests(unittest.TestCase): + def test_build_regime_filter_requires_market_trend_and_non_market_leader(self): + from research.regime_filters import build_regime_filter + + dates = pd.date_range("2023-01-01", periods=260, freq="D") + spy = pd.Series([100.0 + i for i in range(260)], index=dates) + qqq_leader = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates) + xlu = pd.Series([100.0 + 0.2 * i for i in range(260)], index=dates) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + bullish = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_leader, "XLU": xlu})) + qqq_laggard = pd.Series([100.0 + 0.5 * i for i in range(260)], index=dates) + no_leader = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_laggard, "XLU": xlu})) + + self.assertEqual(len(caught), 0) + self.assertFalse(bool(bullish.iloc[199])) + self.assertTrue(bool(bullish.iloc[-1])) + self.assertFalse(bool(no_leader.iloc[-1])) + + def test_build_regime_filter_handles_internal_missing_prices_without_warnings(self): + from research.regime_filters import build_regime_filter + + dates = pd.date_range("2023-01-01", periods=260, freq="D") + spy = pd.Series([100.0 + i for i in range(260)], index=dates) + qqq = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates) + qqq.iloc[120] = np.nan + etf_close = pd.DataFrame({"SPY": spy, "QQQ": qqq, "XLU": 100.0}, index=dates) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + regime = build_regime_filter(etf_close) + + self.assertEqual(len(caught), 0) + self.assertEqual(str(regime.dtype), "bool") + + def test_breakout_after_compression_score_is_shifted_and_rewards_breakout_profile(self): + from research.event_factors import breakout_after_compression_score + + dates = pd.date_range("2024-01-01", periods=80, freq="D") + + aaa_close = [100.0 + i for i in range(60)] + [159.0 + 0.05 * i for i in range(20)] + bbb_close = [100.0 + i for i in range(60)] + [150.0 - i for i in range(20)] + close = pd.DataFrame({"AAA": aaa_close, "BBB": bbb_close}, index=dates) + + high = pd.DataFrame( + { + "AAA": [value + 0.4 for value in aaa_close], + "BBB": [value + 4.0 for value in bbb_close], + }, + index=dates, + ) + low = pd.DataFrame( + { + "AAA": [value - 0.4 for value in aaa_close], + "BBB": [value - 4.0 for value in bbb_close], + }, + index=dates, + ) + volume = pd.DataFrame( + { + "AAA": [1_000.0] * 79 + [1_000.0], + "BBB": [1_000.0] * 80, + }, + index=dates, + ) + volume.loc[dates[-2], "AAA"] = 6_000.0 + + shifted_result = breakout_after_compression_score(close, high, low, volume) + self.assertGreater( + shifted_result.loc[dates[-1], "AAA"], + shifted_result.loc[dates[-1], "BBB"], + ) + + changed_last_day = close.copy() + changed_last_day_high = high.copy() + changed_last_day_low = low.copy() + changed_last_day_volume = volume.copy() + changed_last_day.loc[dates[-1], "AAA"] = 120.0 + changed_last_day_high.loc[dates[-1], "AAA"] = 130.0 + changed_last_day_low.loc[dates[-1], "AAA"] = 110.0 + changed_last_day_volume.loc[dates[-1], "AAA"] = 20_000.0 + + last_day_changed_result = breakout_after_compression_score( + changed_last_day, + changed_last_day_high, + changed_last_day_low, + changed_last_day_volume, + ) + self.assertEqual( + shifted_result.loc[dates[-1], "AAA"], + last_day_changed_result.loc[dates[-1], "AAA"], + ) + + def test_breakout_after_compression_score_keeps_float_output_when_denominators_hit_zero(self): + from research.event_factors import breakout_after_compression_score + + dates = pd.date_range("2024-01-01", periods=70, freq="D") + close = pd.DataFrame({"AAA": [10.0] * 70}, index=dates) + high = pd.DataFrame({"AAA": [10.0] * 70}, index=dates) + low = pd.DataFrame({"AAA": [10.0] * 70}, index=dates) + volume = pd.DataFrame({"AAA": [0.0] * 70}, index=dates) + + score = breakout_after_compression_score(close, high, low, volume) + + self.assertEqual(str(score.dtypes["AAA"]), "float64") + self.assertTrue(pd.isna(score.iloc[-1]["AAA"])) + + +if __name__ == "__main__": + unittest.main()