feat: add regime and breakout alpha modules
This commit is contained in:
34
research/event_factors.py
Normal file
34
research/event_factors.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
|
||||
TRAILING_HIGH_WINDOW = 60
|
||||
COMPRESSION_WINDOW = 20
|
||||
VOLUME_WINDOW = 20
|
||||
|
||||
|
||||
def breakout_after_compression_score(
|
||||
close: pd.DataFrame,
|
||||
high: pd.DataFrame,
|
||||
low: pd.DataFrame,
|
||||
volume: pd.DataFrame,
|
||||
) -> pd.DataFrame:
|
||||
"""Score breakout setups and shift the result so it is tradable next day."""
|
||||
close = close.sort_index()
|
||||
high = high.reindex(index=close.index, columns=close.columns).sort_index()
|
||||
low = low.reindex(index=close.index, columns=close.columns).sort_index()
|
||||
volume = volume.reindex(index=close.index, columns=close.columns).sort_index()
|
||||
|
||||
trailing_high = close.rolling(TRAILING_HIGH_WINDOW, min_periods=TRAILING_HIGH_WINDOW).max()
|
||||
proximity_to_high = close / trailing_high.replace(0, np.nan)
|
||||
|
||||
recent_high = high.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).max()
|
||||
recent_low = low.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).min()
|
||||
recent_mid = (recent_high + recent_low) / 2
|
||||
compressed_range = -((recent_high - recent_low) / recent_mid.replace(0, np.nan))
|
||||
|
||||
median_volume = volume.rolling(VOLUME_WINDOW, min_periods=VOLUME_WINDOW).median()
|
||||
abnormal_volume = volume / median_volume.replace(0, np.nan)
|
||||
|
||||
score = proximity_to_high + compressed_range + abnormal_volume
|
||||
return score.shift(1)
|
||||
23
research/regime_filters.py
Normal file
23
research/regime_filters.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import pandas as pd
|
||||
|
||||
|
||||
LONG_MA_WINDOW = 200
|
||||
RS_WINDOW = 63
|
||||
|
||||
|
||||
def build_regime_filter(etf_close: pd.DataFrame, market_col: str = "SPY") -> pd.Series:
|
||||
"""Return a next-day tradable regime flag based on market trend and ETF leadership."""
|
||||
prices = etf_close.sort_index()
|
||||
if market_col not in prices.columns:
|
||||
raise KeyError(f"{market_col} not found in etf_close")
|
||||
|
||||
market = prices[market_col]
|
||||
market_ma = market.rolling(LONG_MA_WINDOW, min_periods=LONG_MA_WINDOW).mean()
|
||||
market_ok = market.gt(market_ma)
|
||||
|
||||
rs = prices.pct_change(RS_WINDOW, fill_method=None)
|
||||
non_market_rs = rs.drop(columns=[market_col], errors="ignore")
|
||||
leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1)
|
||||
|
||||
regime = (market_ok & leader_ok).astype(bool)
|
||||
return regime.shift(1, fill_value=False)
|
||||
118
tests/test_alpha_signals.py
Normal file
118
tests/test_alpha_signals.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class AlphaSignalTests(unittest.TestCase):
|
||||
def test_build_regime_filter_requires_market_trend_and_non_market_leader(self):
|
||||
from research.regime_filters import build_regime_filter
|
||||
|
||||
dates = pd.date_range("2023-01-01", periods=260, freq="D")
|
||||
spy = pd.Series([100.0 + i for i in range(260)], index=dates)
|
||||
qqq_leader = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates)
|
||||
xlu = pd.Series([100.0 + 0.2 * i for i in range(260)], index=dates)
|
||||
|
||||
with warnings.catch_warnings(record=True) as caught:
|
||||
warnings.simplefilter("always")
|
||||
bullish = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_leader, "XLU": xlu}))
|
||||
qqq_laggard = pd.Series([100.0 + 0.5 * i for i in range(260)], index=dates)
|
||||
no_leader = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_laggard, "XLU": xlu}))
|
||||
|
||||
self.assertEqual(len(caught), 0)
|
||||
self.assertFalse(bool(bullish.iloc[199]))
|
||||
self.assertTrue(bool(bullish.iloc[-1]))
|
||||
self.assertFalse(bool(no_leader.iloc[-1]))
|
||||
|
||||
def test_build_regime_filter_handles_internal_missing_prices_without_warnings(self):
|
||||
from research.regime_filters import build_regime_filter
|
||||
|
||||
dates = pd.date_range("2023-01-01", periods=260, freq="D")
|
||||
spy = pd.Series([100.0 + i for i in range(260)], index=dates)
|
||||
qqq = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates)
|
||||
qqq.iloc[120] = np.nan
|
||||
etf_close = pd.DataFrame({"SPY": spy, "QQQ": qqq, "XLU": 100.0}, index=dates)
|
||||
|
||||
with warnings.catch_warnings(record=True) as caught:
|
||||
warnings.simplefilter("always")
|
||||
regime = build_regime_filter(etf_close)
|
||||
|
||||
self.assertEqual(len(caught), 0)
|
||||
self.assertEqual(str(regime.dtype), "bool")
|
||||
|
||||
def test_breakout_after_compression_score_is_shifted_and_rewards_breakout_profile(self):
|
||||
from research.event_factors import breakout_after_compression_score
|
||||
|
||||
dates = pd.date_range("2024-01-01", periods=80, freq="D")
|
||||
|
||||
aaa_close = [100.0 + i for i in range(60)] + [159.0 + 0.05 * i for i in range(20)]
|
||||
bbb_close = [100.0 + i for i in range(60)] + [150.0 - i for i in range(20)]
|
||||
close = pd.DataFrame({"AAA": aaa_close, "BBB": bbb_close}, index=dates)
|
||||
|
||||
high = pd.DataFrame(
|
||||
{
|
||||
"AAA": [value + 0.4 for value in aaa_close],
|
||||
"BBB": [value + 4.0 for value in bbb_close],
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
low = pd.DataFrame(
|
||||
{
|
||||
"AAA": [value - 0.4 for value in aaa_close],
|
||||
"BBB": [value - 4.0 for value in bbb_close],
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
volume = pd.DataFrame(
|
||||
{
|
||||
"AAA": [1_000.0] * 79 + [1_000.0],
|
||||
"BBB": [1_000.0] * 80,
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
volume.loc[dates[-2], "AAA"] = 6_000.0
|
||||
|
||||
shifted_result = breakout_after_compression_score(close, high, low, volume)
|
||||
self.assertGreater(
|
||||
shifted_result.loc[dates[-1], "AAA"],
|
||||
shifted_result.loc[dates[-1], "BBB"],
|
||||
)
|
||||
|
||||
changed_last_day = close.copy()
|
||||
changed_last_day_high = high.copy()
|
||||
changed_last_day_low = low.copy()
|
||||
changed_last_day_volume = volume.copy()
|
||||
changed_last_day.loc[dates[-1], "AAA"] = 120.0
|
||||
changed_last_day_high.loc[dates[-1], "AAA"] = 130.0
|
||||
changed_last_day_low.loc[dates[-1], "AAA"] = 110.0
|
||||
changed_last_day_volume.loc[dates[-1], "AAA"] = 20_000.0
|
||||
|
||||
last_day_changed_result = breakout_after_compression_score(
|
||||
changed_last_day,
|
||||
changed_last_day_high,
|
||||
changed_last_day_low,
|
||||
changed_last_day_volume,
|
||||
)
|
||||
self.assertEqual(
|
||||
shifted_result.loc[dates[-1], "AAA"],
|
||||
last_day_changed_result.loc[dates[-1], "AAA"],
|
||||
)
|
||||
|
||||
def test_breakout_after_compression_score_keeps_float_output_when_denominators_hit_zero(self):
|
||||
from research.event_factors import breakout_after_compression_score
|
||||
|
||||
dates = pd.date_range("2024-01-01", periods=70, freq="D")
|
||||
close = pd.DataFrame({"AAA": [10.0] * 70}, index=dates)
|
||||
high = pd.DataFrame({"AAA": [10.0] * 70}, index=dates)
|
||||
low = pd.DataFrame({"AAA": [10.0] * 70}, index=dates)
|
||||
volume = pd.DataFrame({"AAA": [0.0] * 70}, index=dates)
|
||||
|
||||
score = breakout_after_compression_score(close, high, low, volume)
|
||||
|
||||
self.assertEqual(str(score.dtypes["AAA"]), "float64")
|
||||
self.assertTrue(pd.isna(score.iloc[-1]["AAA"]))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user