diff --git a/research/fetch_historical.py b/research/fetch_historical.py index 12c2940..9ab48ad 100644 --- a/research/fetch_historical.py +++ b/research/fetch_historical.py @@ -25,6 +25,15 @@ YEARS = 10 BATCH_SIZE = 50 +def _field_out_paths() -> dict[str, str]: + return { + "Close": os.path.join(DATA_DIR, "us_pit_close.csv"), + "High": os.path.join(DATA_DIR, "us_pit_high.csv"), + "Low": os.path.join(DATA_DIR, "us_pit_low.csv"), + "Volume": os.path.join(DATA_DIR, "us_pit_volume.csv"), + } + + def fetch_all_historical(force: bool = False) -> pd.DataFrame: os.makedirs(DATA_DIR, exist_ok=True) intervals = uh.load_sp500_history() @@ -74,8 +83,41 @@ def fetch_all_historical(force: bool = False) -> pd.DataFrame: return combined +def fetch_all_historical_ohlcv(force: bool = False) -> dict[str, pd.DataFrame]: + os.makedirs(DATA_DIR, exist_ok=True) + intervals = uh.load_sp500_history() + tickers = uh.all_tickers_ever(intervals) + ["SPY"] + tickers = sorted(set(tickers)) + start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d") + panels = _download_batched_fields(tickers, start=start, fields=["Close", "High", "Low", "Volume"]) + if not panels: + raise RuntimeError("No PIT OHLCV data downloaded") + + close = panels["Close"] + close.to_csv(OUT_PATH) + print(f"--- Saved {close.shape} to {OUT_PATH} ---") + result: dict[str, pd.DataFrame] = {"close": close} + for field, path in _field_out_paths().items(): + panel = panels[field] + panel.to_csv(path) + print(f"--- Saved {panel.shape} to {path} ---") + result[field.lower()] = panel + return result + + def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None: - frames = [] + panels = _download_batched_fields(tickers, start=start, fields=["Close"]) + if not panels: + return None + return panels["Close"] + + +def _download_batched_fields( + tickers: list[str], + start: str, + fields: list[str], +) -> dict[str, pd.DataFrame]: + frames = {field: [] for field in fields} n = len(tickers) for i in range(0, n, BATCH_SIZE): batch = tickers[i:i + BATCH_SIZE] @@ -85,19 +127,24 @@ def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None: progress=False, threads=True) if raw.empty: continue - if isinstance(raw.columns, pd.MultiIndex): - close = raw["Close"] - else: - close = raw[["Close"]].rename(columns={"Close": batch[0]}) - close = close.dropna(axis=1, how="all") - if not close.empty: - frames.append(close) + for field in fields: + if isinstance(raw.columns, pd.MultiIndex): + panel = raw[field] + else: + panel = raw[[field]].rename(columns={field: batch[0]}) + panel = panel.dropna(axis=1, how="all") + if not panel.empty: + frames[field].append(panel) except Exception as e: print(f" batch failed: {e}") - if not frames: - return None - result = pd.concat(frames, axis=1).sort_index() - result = result.loc[:, ~result.columns.duplicated()] + result = {} + for field, field_frames in frames.items(): + if field_frames: + panel = pd.concat(field_frames, axis=1).sort_index() + panel = panel.loc[:, ~panel.columns.duplicated()] + result[field] = panel + else: + result[field] = pd.DataFrame() return result diff --git a/research/us_alpha_pipeline.py b/research/us_alpha_pipeline.py index ab4ef69..c654928 100644 --- a/research/us_alpha_pipeline.py +++ b/research/us_alpha_pipeline.py @@ -1,6 +1,8 @@ import numpy as np import pandas as pd +import data_manager +import universe_history as uh from research.event_factors import breakout_after_compression_score from research.regime_filters import build_regime_filter from research.us_alpha_report import summarize_equity_window @@ -16,6 +18,7 @@ LIQUIDITY_WINDOW = 60 TREND_WINDOW = 126 RECOVERY_WINDOW = 63 HIGH_PROX_WINDOW = 126 +ETF_TICKERS = ["SPY", "QQQ", "IWM", "MDY", "XLK", "XLF", "XLI", "XLV"] def _price_rank_blend_score(close: pd.DataFrame) -> pd.DataFrame: @@ -51,10 +54,36 @@ def _build_equal_weight_portfolio( def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series: """Convert daily weights into a simple close-to-close equity curve.""" returns = close.pct_change(fill_method=None).fillna(0.0) - portfolio_returns = (returns * weights.shift(1).fillna(0.0)).sum(axis=1) + portfolio_returns = (returns * weights).sum(axis=1) return (1.0 + portfolio_returns).cumprod() +def _read_panel_csv(path: str) -> pd.DataFrame: + return pd.read_csv(path, index_col=0, parse_dates=True).sort_index() + + +def load_saved_pit_market_data(data_dir: str = "data", prefix: str = "us_pit") -> dict[str, pd.DataFrame]: + """Load saved PIT OHLCV panels from disk.""" + panels = {} + for field in ("close", "high", "low", "volume"): + panels[field] = _read_panel_csv(f"{data_dir}/{prefix}_{field}.csv") + return panels + + +def load_saved_etf_close(data_dir: str = "data", market: str = "us_etf") -> pd.DataFrame: + """Load saved ETF closes or populate them on demand.""" + path = f"{data_dir}/{market}.csv" + try: + return _read_panel_csv(path) + except FileNotFoundError: + original_data_dir = data_manager.DATA_DIR + try: + data_manager.DATA_DIR = data_dir + return data_manager.update_market_data(market, ETF_TICKERS, ["close"])["close"] + finally: + data_manager.DATA_DIR = original_data_dir + + def run_alpha_pipeline( market_data, etf_close, @@ -93,3 +122,35 @@ def run_alpha_pipeline( summary_rows.append(summarize_equity_window(equity, strategy_name, window_years)) return pd.DataFrame(summary_rows) + + +def run_saved_pit_alpha_pipeline( + data_dir: str = "data", + windows=(1, 2, 3, 5, 10), + top_n: int = 10, +) -> pd.DataFrame: + """Load saved PIT OHLCV inputs and run the strict alpha pipeline.""" + market_data = load_saved_pit_market_data(data_dir=data_dir) + etf_close = load_saved_etf_close(data_dir=data_dir) + intervals = uh.load_sp500_history() + pit_membership = uh.membership_mask( + market_data["close"].index, + intervals=intervals, + tickers=list(market_data["close"].columns), + ) + return run_alpha_pipeline( + market_data=market_data, + etf_close=etf_close, + pit_membership=pit_membership, + windows=windows, + top_n=top_n, + ) + + +def main() -> None: + summary = run_saved_pit_alpha_pipeline() + print(summary.to_string(index=False)) + + +if __name__ == "__main__": + main() diff --git a/research/us_alpha_report.py b/research/us_alpha_report.py index a973059..ee8db90 100644 --- a/research/us_alpha_report.py +++ b/research/us_alpha_report.py @@ -8,8 +8,8 @@ TRADING_DAYS_PER_YEAR = 252 def summarize_equity_window(equity: pd.Series, strategy: str, window_years: int | float) -> dict: """Summarize a strategy equity curve over a trailing trading-day window.""" window_days = max(int(window_years * TRADING_DAYS_PER_YEAR), 1) - window_equity = equity.tail(window_days + 1).dropna() - if len(window_equity) < 2: + clean_equity = equity.dropna() + if len(clean_equity) < window_days + 1: return { "strategy": strategy, "window_years": window_years, @@ -18,6 +18,7 @@ def summarize_equity_window(equity: pd.Series, strategy: str, window_years: int "MaxDD": np.nan, "TotalRet": np.nan, } + window_equity = clean_equity.tail(window_days + 1) daily = window_equity.pct_change(fill_method=None).dropna() total_ret = window_equity.iloc[-1] / window_equity.iloc[0] - 1 diff --git a/tests/test_fetch_historical.py b/tests/test_fetch_historical.py new file mode 100644 index 0000000..e0ca391 --- /dev/null +++ b/tests/test_fetch_historical.py @@ -0,0 +1,46 @@ +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import pandas as pd + +from research import fetch_historical + + +class FetchHistoricalTests(unittest.TestCase): + def test_fetch_all_historical_ohlcv_writes_field_specific_csvs(self): + dates = pd.to_datetime(["2024-01-02", "2024-01-03"]) + raw = pd.DataFrame( + { + ("Close", "AAA"): [10.0, 11.0], + ("Close", "BBB"): [20.0, 21.0], + ("High", "AAA"): [10.5, 11.5], + ("High", "BBB"): [20.5, 21.5], + ("Low", "AAA"): [9.5, 10.5], + ("Low", "BBB"): [19.5, 20.5], + ("Volume", "AAA"): [1000.0, 1100.0], + ("Volume", "BBB"): [2000.0, 2100.0], + }, + index=dates, + ) + raw.columns = pd.MultiIndex.from_tuples(raw.columns) + + with tempfile.TemporaryDirectory() as tmpdir: + with mock.patch.object(fetch_historical, "DATA_DIR", tmpdir): + with mock.patch.object(fetch_historical, "OUT_PATH", str(Path(tmpdir) / "us_pit.csv")): + with mock.patch("research.fetch_historical.uh.load_sp500_history", return_value={"AAA": [[None, None]], "BBB": [[None, None]]}): + with mock.patch("research.fetch_historical.uh.all_tickers_ever", return_value=["AAA", "BBB"]): + with mock.patch("research.fetch_historical.yf.download", return_value=raw): + panels = fetch_historical.fetch_all_historical_ohlcv(force=True) + + self.assertEqual(set(panels.keys()), {"close", "high", "low", "volume"}) + self.assertTrue((Path(tmpdir) / "us_pit.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_close.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_high.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_low.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_volume.csv").exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_us_alpha_pipeline.py b/tests/test_us_alpha_pipeline.py index 969c069..754114f 100644 --- a/tests/test_us_alpha_pipeline.py +++ b/tests/test_us_alpha_pipeline.py @@ -1,4 +1,6 @@ import unittest +from pathlib import Path +from unittest import mock import pandas as pd @@ -33,9 +35,22 @@ class USAlphaPipelineTests(unittest.TestCase): equity = _equity_curve(close, weights) - self.assertEqual(float(equity.iloc[1]), 1.0) + self.assertEqual(float(equity.iloc[1]), 2.0) self.assertEqual(float(equity.iloc[2]), 2.0) + def test_summarize_equity_window_returns_nans_when_history_is_too_short(self): + from research.us_alpha_report import summarize_equity_window + + dates = pd.date_range("2024-01-01", periods=10, freq="D") + equity = pd.Series([1.0 + 0.01 * i for i in range(10)], index=dates) + + summary = summarize_equity_window(equity, "demo", window_years=1) + + self.assertTrue(pd.isna(summary["CAGR"])) + self.assertTrue(pd.isna(summary["Sharpe"])) + self.assertTrue(pd.isna(summary["MaxDD"])) + self.assertTrue(pd.isna(summary["TotalRet"])) + def test_run_alpha_pipeline_returns_expected_strategy_summary(self): from research.us_alpha_pipeline import run_alpha_pipeline @@ -109,6 +124,41 @@ class USAlphaPipelineTests(unittest.TestCase): self.assertEqual(len(summary), 2) self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all()) + def test_run_saved_pit_alpha_pipeline_reads_saved_inputs(self): + from research.us_alpha_pipeline import run_saved_pit_alpha_pipeline + + dates = pd.date_range("2024-01-01", periods=320, freq="D") + close = pd.DataFrame( + { + "AAA": [50.0 + 0.2 * i for i in range(320)], + "BBB": [40.0 + 0.1 * i for i in range(320)], + }, + index=dates, + ) + high = close + 1.0 + low = close - 1.0 + volume = pd.DataFrame({"AAA": [2_500_000.0] * 320, "BBB": [2_000_000.0] * 320}, index=dates) + etf_close = pd.DataFrame( + {"SPY": [300.0 + 0.8 * i for i in range(320)], "QQQ": [280.0 + 1.1 * i for i in range(320)]}, + index=dates, + ) + + with self.subTest("saved_inputs"): + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + close.to_csv(Path(tmpdir) / "us_pit_close.csv") + high.to_csv(Path(tmpdir) / "us_pit_high.csv") + low.to_csv(Path(tmpdir) / "us_pit_low.csv") + volume.to_csv(Path(tmpdir) / "us_pit_volume.csv") + etf_close.to_csv(Path(tmpdir) / "us_etf.csv") + + intervals = {"AAA": [[None, None]], "BBB": [[None, None]]} + with mock.patch("research.us_alpha_pipeline.uh.load_sp500_history", return_value=intervals): + summary = run_saved_pit_alpha_pipeline(data_dir=tmpdir, windows=(1,), top_n=1) + + self.assertEqual(set(summary["strategy"]), {"breakout_regime", "rank_blend_regime"}) + if __name__ == "__main__": unittest.main()