diff --git a/data_manager.py b/data_manager.py index f1f4d4e..c156b8b 100644 --- a/data_manager.py +++ b/data_manager.py @@ -63,10 +63,11 @@ def _download(tickers: list[str], start: str, end: str | None = None, result = {} for field in fields: if field in raw.columns.get_level_values(0) if isinstance(raw.columns, pd.MultiIndex) else field in raw.columns: - if len(tickers) > 1: - result[field] = raw[field] + selected = raw[field] + if isinstance(selected, pd.Series): + result[field] = selected.to_frame(name=tickers[0]) else: - result[field] = raw[field].to_frame(name=tickers[0]) + result[field] = selected else: result[field] = pd.DataFrame() return result @@ -83,10 +84,11 @@ def _download_period(tickers: list[str], period: str, result = {} for field in fields: if field in raw.columns.get_level_values(0) if isinstance(raw.columns, pd.MultiIndex) else field in raw.columns: - if len(tickers) > 1: - result[field] = raw[field] + selected = raw[field] + if isinstance(selected, pd.Series): + result[field] = selected.to_frame(name=tickers[0]) else: - result[field] = raw[field].to_frame(name=tickers[0]) + result[field] = selected else: result[field] = pd.DataFrame() return result @@ -103,6 +105,66 @@ def _clean(data: pd.DataFrame) -> pd.DataFrame: return data +def _clean_market_data(data: pd.DataFrame, field: str) -> pd.DataFrame: + """Clean market data while preserving volume gaps.""" + good = data.columns[data.notna().mean() > 0.5] + dropped = set(data.columns) - set(good) + if dropped: + print(f"--- Dropped {len(dropped)} tickers with >50% missing data ---") + data = data[good] + if field == "volume": + return data + return data.ffill().dropna(how="all") + + +def _merge_market_panel(existing: pd.DataFrame | None, new_data: pd.DataFrame) -> pd.DataFrame: + """Merge new data into an existing cached panel, preserving old columns and dates.""" + if existing is None or existing.empty: + merged = new_data.copy() + elif new_data.empty: + merged = existing.copy() + else: + merged = existing.combine_first(new_data) + merged.loc[new_data.index, new_data.columns] = new_data + merged = merged.sort_index() + merged = merged[~merged.index.duplicated(keep="last")] + return merged + + +def update_market_data(market: str, tickers: list[str], fields: list[str]) -> dict[str, pd.DataFrame]: + """Download, clean, persist, and return market data panels for requested Yahoo fields.""" + field_aliases = { + "close": "Close", + "open": "Open", + "high": "High", + "low": "Low", + "volume": "Volume", + } + normalized_fields = [] + yahoo_fields = [] + for field in fields: + normalized = field.lower() + if normalized not in field_aliases: + raise ValueError(f"Unsupported market data field: {field}") + normalized_fields.append(normalized) + yahoo_fields.append(field_aliases[normalized]) + + os.makedirs(DATA_DIR, exist_ok=True) + start = (datetime.now() - timedelta(days=365 * 10)).strftime("%Y-%m-%d") + downloaded = _download(tickers, start=start, fields=yahoo_fields) + + cleaned = {} + for normalized, yahoo_field in zip(normalized_fields, yahoo_fields): + data = _clean_market_data(downloaded.get(yahoo_field, pd.DataFrame()), normalized) + existing = load(market, normalized) + data = _merge_market_panel(existing, data) + path = _data_path(market, normalized) + data.to_csv(path) + print(f"--- Saved {data.shape[0]} days x {data.shape[1]} tickers to {path} ---") + cleaned[normalized] = data + return cleaned + + def update(market: str, tickers: list[str], with_open: bool = False) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]: """ diff --git a/research/event_factors.py b/research/event_factors.py new file mode 100644 index 0000000..e1412bf --- /dev/null +++ b/research/event_factors.py @@ -0,0 +1,34 @@ +import numpy as np +import pandas as pd + + +TRAILING_HIGH_WINDOW = 60 +COMPRESSION_WINDOW = 20 +VOLUME_WINDOW = 20 + + +def breakout_after_compression_score( + close: pd.DataFrame, + high: pd.DataFrame, + low: pd.DataFrame, + volume: pd.DataFrame, +) -> pd.DataFrame: + """Score breakout setups and shift the result so it is tradable next day.""" + close = close.sort_index() + high = high.reindex(index=close.index, columns=close.columns).sort_index() + low = low.reindex(index=close.index, columns=close.columns).sort_index() + volume = volume.reindex(index=close.index, columns=close.columns).sort_index() + + trailing_high = close.rolling(TRAILING_HIGH_WINDOW, min_periods=TRAILING_HIGH_WINDOW).max() + proximity_to_high = close / trailing_high.replace(0, np.nan) + + recent_high = high.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).max() + recent_low = low.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).min() + recent_mid = (recent_high + recent_low) / 2 + compressed_range = -((recent_high - recent_low) / recent_mid.replace(0, np.nan)) + + median_volume = volume.rolling(VOLUME_WINDOW, min_periods=VOLUME_WINDOW).median() + abnormal_volume = volume / median_volume.replace(0, np.nan) + + score = proximity_to_high + compressed_range + abnormal_volume + return score.shift(1) diff --git a/research/fetch_historical.py b/research/fetch_historical.py index 12c2940..9ab48ad 100644 --- a/research/fetch_historical.py +++ b/research/fetch_historical.py @@ -25,6 +25,15 @@ YEARS = 10 BATCH_SIZE = 50 +def _field_out_paths() -> dict[str, str]: + return { + "Close": os.path.join(DATA_DIR, "us_pit_close.csv"), + "High": os.path.join(DATA_DIR, "us_pit_high.csv"), + "Low": os.path.join(DATA_DIR, "us_pit_low.csv"), + "Volume": os.path.join(DATA_DIR, "us_pit_volume.csv"), + } + + def fetch_all_historical(force: bool = False) -> pd.DataFrame: os.makedirs(DATA_DIR, exist_ok=True) intervals = uh.load_sp500_history() @@ -74,8 +83,41 @@ def fetch_all_historical(force: bool = False) -> pd.DataFrame: return combined +def fetch_all_historical_ohlcv(force: bool = False) -> dict[str, pd.DataFrame]: + os.makedirs(DATA_DIR, exist_ok=True) + intervals = uh.load_sp500_history() + tickers = uh.all_tickers_ever(intervals) + ["SPY"] + tickers = sorted(set(tickers)) + start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d") + panels = _download_batched_fields(tickers, start=start, fields=["Close", "High", "Low", "Volume"]) + if not panels: + raise RuntimeError("No PIT OHLCV data downloaded") + + close = panels["Close"] + close.to_csv(OUT_PATH) + print(f"--- Saved {close.shape} to {OUT_PATH} ---") + result: dict[str, pd.DataFrame] = {"close": close} + for field, path in _field_out_paths().items(): + panel = panels[field] + panel.to_csv(path) + print(f"--- Saved {panel.shape} to {path} ---") + result[field.lower()] = panel + return result + + def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None: - frames = [] + panels = _download_batched_fields(tickers, start=start, fields=["Close"]) + if not panels: + return None + return panels["Close"] + + +def _download_batched_fields( + tickers: list[str], + start: str, + fields: list[str], +) -> dict[str, pd.DataFrame]: + frames = {field: [] for field in fields} n = len(tickers) for i in range(0, n, BATCH_SIZE): batch = tickers[i:i + BATCH_SIZE] @@ -85,19 +127,24 @@ def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None: progress=False, threads=True) if raw.empty: continue - if isinstance(raw.columns, pd.MultiIndex): - close = raw["Close"] - else: - close = raw[["Close"]].rename(columns={"Close": batch[0]}) - close = close.dropna(axis=1, how="all") - if not close.empty: - frames.append(close) + for field in fields: + if isinstance(raw.columns, pd.MultiIndex): + panel = raw[field] + else: + panel = raw[[field]].rename(columns={field: batch[0]}) + panel = panel.dropna(axis=1, how="all") + if not panel.empty: + frames[field].append(panel) except Exception as e: print(f" batch failed: {e}") - if not frames: - return None - result = pd.concat(frames, axis=1).sort_index() - result = result.loc[:, ~result.columns.duplicated()] + result = {} + for field, field_frames in frames.items(): + if field_frames: + panel = pd.concat(field_frames, axis=1).sort_index() + panel = panel.loc[:, ~panel.columns.duplicated()] + result[field] = panel + else: + result[field] = pd.DataFrame() return result diff --git a/research/regime_filters.py b/research/regime_filters.py new file mode 100644 index 0000000..4d9e4c5 --- /dev/null +++ b/research/regime_filters.py @@ -0,0 +1,23 @@ +import pandas as pd + + +LONG_MA_WINDOW = 200 +RS_WINDOW = 63 + + +def build_regime_filter(etf_close: pd.DataFrame, market_col: str = "SPY") -> pd.Series: + """Return a next-day tradable regime flag based on market trend and ETF leadership.""" + prices = etf_close.sort_index() + if market_col not in prices.columns: + raise KeyError(f"{market_col} not found in etf_close") + + market = prices[market_col] + market_ma = market.rolling(LONG_MA_WINDOW, min_periods=LONG_MA_WINDOW).mean() + market_ok = market.gt(market_ma) + + rs = prices.pct_change(RS_WINDOW, fill_method=None) + non_market_rs = rs.drop(columns=[market_col], errors="ignore") + leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1) + + regime = (market_ok & leader_ok).astype(bool) + return regime.shift(1, fill_value=False) diff --git a/research/us_alpha_pipeline.py b/research/us_alpha_pipeline.py new file mode 100644 index 0000000..c654928 --- /dev/null +++ b/research/us_alpha_pipeline.py @@ -0,0 +1,156 @@ +import numpy as np +import pandas as pd + +import data_manager +import universe_history as uh +from research.event_factors import breakout_after_compression_score +from research.regime_filters import build_regime_filter +from research.us_alpha_report import summarize_equity_window +from research.us_universe import build_tradable_mask + + +MIN_PRICE = 5.0 +MIN_DOLLAR_VOLUME = 20_000_000.0 +MIN_HISTORY_DAYS = 252 +MIN_VALID_VOLUME_DAYS = 40 +LIQUIDITY_WINDOW = 60 + +TREND_WINDOW = 126 +RECOVERY_WINDOW = 63 +HIGH_PROX_WINDOW = 126 +ETF_TICKERS = ["SPY", "QQQ", "IWM", "MDY", "XLK", "XLF", "XLI", "XLV"] + + +def _price_rank_blend_score(close: pd.DataFrame) -> pd.DataFrame: + """Simple price-only cross-sectional blend, shifted for next-day trading.""" + trend = close.pct_change(TREND_WINDOW, fill_method=None) + recovery = close / close.rolling(RECOVERY_WINDOW, min_periods=RECOVERY_WINDOW).min() - 1 + high_proximity = close / close.rolling(HIGH_PROX_WINDOW, min_periods=HIGH_PROX_WINDOW).max().replace(0, np.nan) + + trend_rank = trend.rank(axis=1, pct=True, na_option="keep") + recovery_rank = recovery.rank(axis=1, pct=True, na_option="keep") + high_rank = high_proximity.rank(axis=1, pct=True, na_option="keep") + return ((trend_rank + recovery_rank + high_rank) / 3.0).shift(1) + + +def _build_equal_weight_portfolio( + score: pd.DataFrame, + tradable_mask: pd.DataFrame, + regime_filter: pd.Series, + top_n: int, +) -> pd.DataFrame: + """Build equal-weight top-n long-only weights from aligned scores.""" + aligned_score = score.reindex(index=tradable_mask.index, columns=tradable_mask.columns) + eligible_score = aligned_score.where(tradable_mask) + rank = eligible_score.rank(axis=1, ascending=False, na_option="bottom", method="first") + selected = (rank <= top_n) & eligible_score.notna() + selected = selected & regime_filter.reindex(tradable_mask.index, fill_value=False).to_numpy().reshape(-1, 1) + + raw = selected.astype(float) + row_sums = raw.sum(axis=1).replace(0.0, np.nan) + return raw.div(row_sums, axis=0).fillna(0.0) + + +def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series: + """Convert daily weights into a simple close-to-close equity curve.""" + returns = close.pct_change(fill_method=None).fillna(0.0) + portfolio_returns = (returns * weights).sum(axis=1) + return (1.0 + portfolio_returns).cumprod() + + +def _read_panel_csv(path: str) -> pd.DataFrame: + return pd.read_csv(path, index_col=0, parse_dates=True).sort_index() + + +def load_saved_pit_market_data(data_dir: str = "data", prefix: str = "us_pit") -> dict[str, pd.DataFrame]: + """Load saved PIT OHLCV panels from disk.""" + panels = {} + for field in ("close", "high", "low", "volume"): + panels[field] = _read_panel_csv(f"{data_dir}/{prefix}_{field}.csv") + return panels + + +def load_saved_etf_close(data_dir: str = "data", market: str = "us_etf") -> pd.DataFrame: + """Load saved ETF closes or populate them on demand.""" + path = f"{data_dir}/{market}.csv" + try: + return _read_panel_csv(path) + except FileNotFoundError: + original_data_dir = data_manager.DATA_DIR + try: + data_manager.DATA_DIR = data_dir + return data_manager.update_market_data(market, ETF_TICKERS, ["close"])["close"] + finally: + data_manager.DATA_DIR = original_data_dir + + +def run_alpha_pipeline( + market_data, + etf_close, + pit_membership=None, + windows=(1, 2, 3, 5, 10), + top_n=10, +) -> pd.DataFrame: + """Run a lightweight strict US alpha pipeline and summarize trailing windows.""" + close = market_data["close"].sort_index() + high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index() + low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index() + volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index() + + tradable_mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=pit_membership, + min_price=MIN_PRICE, + min_dollar_volume=MIN_DOLLAR_VOLUME, + min_history_days=MIN_HISTORY_DAYS, + min_valid_volume_days=MIN_VALID_VOLUME_DAYS, + liquidity_window=LIQUIDITY_WINDOW, + ) + regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False) + + strategy_scores = { + "breakout_regime": breakout_after_compression_score(close, high, low, volume), + "rank_blend_regime": _price_rank_blend_score(close), + } + + summary_rows = [] + for strategy_name, score in strategy_scores.items(): + weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n) + equity = _equity_curve(close, weights) + for window_years in windows: + summary_rows.append(summarize_equity_window(equity, strategy_name, window_years)) + + return pd.DataFrame(summary_rows) + + +def run_saved_pit_alpha_pipeline( + data_dir: str = "data", + windows=(1, 2, 3, 5, 10), + top_n: int = 10, +) -> pd.DataFrame: + """Load saved PIT OHLCV inputs and run the strict alpha pipeline.""" + market_data = load_saved_pit_market_data(data_dir=data_dir) + etf_close = load_saved_etf_close(data_dir=data_dir) + intervals = uh.load_sp500_history() + pit_membership = uh.membership_mask( + market_data["close"].index, + intervals=intervals, + tickers=list(market_data["close"].columns), + ) + return run_alpha_pipeline( + market_data=market_data, + etf_close=etf_close, + pit_membership=pit_membership, + windows=windows, + top_n=top_n, + ) + + +def main() -> None: + summary = run_saved_pit_alpha_pipeline() + print(summary.to_string(index=False)) + + +if __name__ == "__main__": + main() diff --git a/research/us_alpha_report.py b/research/us_alpha_report.py new file mode 100644 index 0000000..ee8db90 --- /dev/null +++ b/research/us_alpha_report.py @@ -0,0 +1,37 @@ +import numpy as np +import pandas as pd + + +TRADING_DAYS_PER_YEAR = 252 + + +def summarize_equity_window(equity: pd.Series, strategy: str, window_years: int | float) -> dict: + """Summarize a strategy equity curve over a trailing trading-day window.""" + window_days = max(int(window_years * TRADING_DAYS_PER_YEAR), 1) + clean_equity = equity.dropna() + if len(clean_equity) < window_days + 1: + return { + "strategy": strategy, + "window_years": window_years, + "CAGR": np.nan, + "Sharpe": np.nan, + "MaxDD": np.nan, + "TotalRet": np.nan, + } + window_equity = clean_equity.tail(window_days + 1) + + daily = window_equity.pct_change(fill_method=None).dropna() + total_ret = window_equity.iloc[-1] / window_equity.iloc[0] - 1 + years = len(daily) / TRADING_DAYS_PER_YEAR + cagr = (window_equity.iloc[-1] / window_equity.iloc[0]) ** (1 / years) - 1 if years > 0 else np.nan + vol = daily.std() * np.sqrt(TRADING_DAYS_PER_YEAR) + sharpe = (daily.mean() * TRADING_DAYS_PER_YEAR) / vol if vol > 0 else 0.0 + max_dd = (window_equity / window_equity.cummax() - 1).min() + return { + "strategy": strategy, + "window_years": window_years, + "CAGR": cagr, + "Sharpe": sharpe, + "MaxDD": max_dd, + "TotalRet": total_ret, + } diff --git a/research/us_universe.py b/research/us_universe.py new file mode 100644 index 0000000..89f7c23 --- /dev/null +++ b/research/us_universe.py @@ -0,0 +1,53 @@ +import pandas as pd + + +def build_tradable_mask( + close: pd.DataFrame, + volume: pd.DataFrame, + pit_membership: pd.DataFrame | None, + min_price: float, + min_dollar_volume: float, + min_history_days: int, + min_valid_volume_days: int, + liquidity_window: int = 60, +) -> pd.DataFrame: + """Build a point-in-time tradable universe mask using only lagged inputs.""" + close = close.sort_index() + volume = volume.reindex(index=close.index, columns=close.columns).sort_index() + if pit_membership is None: + pit_mask = pd.DataFrame(True, index=close.index, columns=close.columns) + else: + pit_mask = pit_membership.reindex( + index=close.index, + columns=close.columns, + fill_value=False, + ) + pit_mask = pit_mask.where(pit_mask.notna(), False).astype(bool) + + eligible_close = close.where(pit_mask) + eligible_volume = volume.where(pit_mask) + + lagged_close = eligible_close.shift(1) + lagged_volume = eligible_volume.shift(1) + lagged_dollar_volume = lagged_close * lagged_volume + + price_ok = lagged_close.gt(min_price) + liquidity_ok = ( + lagged_dollar_volume.rolling(window=liquidity_window, min_periods=1).median().gt(min_dollar_volume) + ) + history_ok = ( + lagged_close.notna() + .rolling(window=min_history_days, min_periods=min_history_days) + .sum() + .ge(min_history_days) + ) + valid_volume_ok = ( + lagged_dollar_volume.notna() + .rolling(window=liquidity_window, min_periods=1) + .sum() + .ge(min_valid_volume_days) + ) + + mask = price_ok & liquidity_ok & history_ok & valid_volume_ok + mask = mask & pit_mask + return mask.astype(bool) diff --git a/tests/test_alpha_signals.py b/tests/test_alpha_signals.py new file mode 100644 index 0000000..c2f4356 --- /dev/null +++ b/tests/test_alpha_signals.py @@ -0,0 +1,118 @@ +import unittest +import warnings + +import numpy as np +import pandas as pd + + +class AlphaSignalTests(unittest.TestCase): + def test_build_regime_filter_requires_market_trend_and_non_market_leader(self): + from research.regime_filters import build_regime_filter + + dates = pd.date_range("2023-01-01", periods=260, freq="D") + spy = pd.Series([100.0 + i for i in range(260)], index=dates) + qqq_leader = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates) + xlu = pd.Series([100.0 + 0.2 * i for i in range(260)], index=dates) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + bullish = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_leader, "XLU": xlu})) + qqq_laggard = pd.Series([100.0 + 0.5 * i for i in range(260)], index=dates) + no_leader = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_laggard, "XLU": xlu})) + + self.assertEqual(len(caught), 0) + self.assertFalse(bool(bullish.iloc[199])) + self.assertTrue(bool(bullish.iloc[-1])) + self.assertFalse(bool(no_leader.iloc[-1])) + + def test_build_regime_filter_handles_internal_missing_prices_without_warnings(self): + from research.regime_filters import build_regime_filter + + dates = pd.date_range("2023-01-01", periods=260, freq="D") + spy = pd.Series([100.0 + i for i in range(260)], index=dates) + qqq = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates) + qqq.iloc[120] = np.nan + etf_close = pd.DataFrame({"SPY": spy, "QQQ": qqq, "XLU": 100.0}, index=dates) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + regime = build_regime_filter(etf_close) + + self.assertEqual(len(caught), 0) + self.assertEqual(str(regime.dtype), "bool") + + def test_breakout_after_compression_score_is_shifted_and_rewards_breakout_profile(self): + from research.event_factors import breakout_after_compression_score + + dates = pd.date_range("2024-01-01", periods=80, freq="D") + + aaa_close = [100.0 + i for i in range(60)] + [159.0 + 0.05 * i for i in range(20)] + bbb_close = [100.0 + i for i in range(60)] + [150.0 - i for i in range(20)] + close = pd.DataFrame({"AAA": aaa_close, "BBB": bbb_close}, index=dates) + + high = pd.DataFrame( + { + "AAA": [value + 0.4 for value in aaa_close], + "BBB": [value + 4.0 for value in bbb_close], + }, + index=dates, + ) + low = pd.DataFrame( + { + "AAA": [value - 0.4 for value in aaa_close], + "BBB": [value - 4.0 for value in bbb_close], + }, + index=dates, + ) + volume = pd.DataFrame( + { + "AAA": [1_000.0] * 79 + [1_000.0], + "BBB": [1_000.0] * 80, + }, + index=dates, + ) + volume.loc[dates[-2], "AAA"] = 6_000.0 + + shifted_result = breakout_after_compression_score(close, high, low, volume) + self.assertGreater( + shifted_result.loc[dates[-1], "AAA"], + shifted_result.loc[dates[-1], "BBB"], + ) + + changed_last_day = close.copy() + changed_last_day_high = high.copy() + changed_last_day_low = low.copy() + changed_last_day_volume = volume.copy() + changed_last_day.loc[dates[-1], "AAA"] = 120.0 + changed_last_day_high.loc[dates[-1], "AAA"] = 130.0 + changed_last_day_low.loc[dates[-1], "AAA"] = 110.0 + changed_last_day_volume.loc[dates[-1], "AAA"] = 20_000.0 + + last_day_changed_result = breakout_after_compression_score( + changed_last_day, + changed_last_day_high, + changed_last_day_low, + changed_last_day_volume, + ) + self.assertEqual( + shifted_result.loc[dates[-1], "AAA"], + last_day_changed_result.loc[dates[-1], "AAA"], + ) + + def test_breakout_after_compression_score_keeps_float_output_when_denominators_hit_zero(self): + from research.event_factors import breakout_after_compression_score + + dates = pd.date_range("2024-01-01", periods=70, freq="D") + close = pd.DataFrame({"AAA": [10.0] * 70}, index=dates) + high = pd.DataFrame({"AAA": [10.0] * 70}, index=dates) + low = pd.DataFrame({"AAA": [10.0] * 70}, index=dates) + volume = pd.DataFrame({"AAA": [0.0] * 70}, index=dates) + + score = breakout_after_compression_score(close, high, low, volume) + + self.assertEqual(str(score.dtypes["AAA"]), "float64") + self.assertTrue(pd.isna(score.iloc[-1]["AAA"])) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_fetch_historical.py b/tests/test_fetch_historical.py new file mode 100644 index 0000000..e0ca391 --- /dev/null +++ b/tests/test_fetch_historical.py @@ -0,0 +1,46 @@ +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import pandas as pd + +from research import fetch_historical + + +class FetchHistoricalTests(unittest.TestCase): + def test_fetch_all_historical_ohlcv_writes_field_specific_csvs(self): + dates = pd.to_datetime(["2024-01-02", "2024-01-03"]) + raw = pd.DataFrame( + { + ("Close", "AAA"): [10.0, 11.0], + ("Close", "BBB"): [20.0, 21.0], + ("High", "AAA"): [10.5, 11.5], + ("High", "BBB"): [20.5, 21.5], + ("Low", "AAA"): [9.5, 10.5], + ("Low", "BBB"): [19.5, 20.5], + ("Volume", "AAA"): [1000.0, 1100.0], + ("Volume", "BBB"): [2000.0, 2100.0], + }, + index=dates, + ) + raw.columns = pd.MultiIndex.from_tuples(raw.columns) + + with tempfile.TemporaryDirectory() as tmpdir: + with mock.patch.object(fetch_historical, "DATA_DIR", tmpdir): + with mock.patch.object(fetch_historical, "OUT_PATH", str(Path(tmpdir) / "us_pit.csv")): + with mock.patch("research.fetch_historical.uh.load_sp500_history", return_value={"AAA": [[None, None]], "BBB": [[None, None]]}): + with mock.patch("research.fetch_historical.uh.all_tickers_ever", return_value=["AAA", "BBB"]): + with mock.patch("research.fetch_historical.yf.download", return_value=raw): + panels = fetch_historical.fetch_all_historical_ohlcv(force=True) + + self.assertEqual(set(panels.keys()), {"close", "high", "low", "volume"}) + self.assertTrue((Path(tmpdir) / "us_pit.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_close.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_high.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_low.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_pit_volume.csv").exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_market_data.py b/tests/test_market_data.py new file mode 100644 index 0000000..9a0241f --- /dev/null +++ b/tests/test_market_data.py @@ -0,0 +1,144 @@ +import tempfile +import unittest +from pathlib import Path +from unittest import mock + +import pandas as pd + +import data_manager + + +class UpdateMarketDataTests(unittest.TestCase): + def test_update_market_data_accepts_lowercase_fields_and_does_not_fill_volume(self): + dates = pd.to_datetime(["2024-01-02", "2024-01-03", "2024-01-04"]) + raw = pd.DataFrame( + { + ("Close", "AAA"): [10.0, 11.0, 12.0], + ("Close", "BBB"): [20.0, float("nan"), 22.0], + ("Open", "AAA"): [9.5, 10.5, 11.5], + ("Open", "BBB"): [19.5, 20.5, 21.5], + ("High", "AAA"): [10.5, 11.5, 12.5], + ("High", "BBB"): [20.5, 21.5, 22.5], + ("Low", "AAA"): [9.0, 10.0, 11.0], + ("Low", "BBB"): [19.0, 20.0, 21.0], + ("Volume", "AAA"): [1000, 1100, 1200], + ("Volume", "BBB"): [2000, float("nan"), 2200], + }, + index=dates, + ) + raw.columns = pd.MultiIndex.from_tuples(raw.columns) + + with tempfile.TemporaryDirectory() as tmpdir: + with mock.patch.object(data_manager, "DATA_DIR", tmpdir): + with mock.patch("data_manager.yf.download", return_value=raw) as mocked_download: + panels = data_manager.update_market_data( + "us", + ["AAA", "BBB"], + ["close", "open", "high", "low", "volume"], + ) + self.assertEqual(set(panels), {"close", "open", "high", "low", "volume"}) + self.assertEqual(panels["close"].loc[dates[1], "BBB"], 20.0) + self.assertTrue(pd.isna(panels["volume"].loc[dates[1], "BBB"])) + self.assertTrue((Path(tmpdir) / "us.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_open.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_high.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_low.csv").exists()) + self.assertTrue((Path(tmpdir) / "us_volume.csv").exists()) + + saved_high = pd.read_csv(Path(tmpdir) / "us_high.csv", index_col=0, parse_dates=True) + pd.testing.assert_frame_equal(saved_high, panels["high"], check_freq=False) + + self.assertEqual(mocked_download.call_args.args[0], ["AAA", "BBB"]) + self.assertEqual(mocked_download.call_args.kwargs["auto_adjust"], True) + self.assertIn("start", mocked_download.call_args.kwargs) + + def test_update_market_data_rejects_unsupported_fields(self): + with tempfile.TemporaryDirectory() as tmpdir: + with mock.patch.object(data_manager, "DATA_DIR", tmpdir): + with self.assertRaisesRegex(ValueError, "Unsupported market data field: adjusted_close"): + data_manager.update_market_data("us", ["AAA"], ["adjusted_close"]) + + def test_update_market_data_preserves_existing_cache_columns_and_dates(self): + existing_dates = pd.to_datetime(["2024-01-01", "2024-01-02"]) + new_dates = pd.to_datetime(["2024-01-02", "2024-01-03"]) + existing_close = pd.DataFrame( + { + "AAA": [9.0, 10.0], + "CCC": [30.0, 31.0], + }, + index=existing_dates, + ) + downloaded_close = pd.DataFrame({"Close": [10.5, 11.5]}, index=new_dates) + + with tempfile.TemporaryDirectory() as tmpdir: + existing_close.to_csv(Path(tmpdir) / "us.csv") + with mock.patch.object(data_manager, "DATA_DIR", tmpdir): + with mock.patch("data_manager.yf.download", return_value=downloaded_close): + panels = data_manager.update_market_data("us", ["AAA"], ["close"]) + + expected = pd.DataFrame( + { + "AAA": [9.0, 10.5, 11.5], + "CCC": [30.0, 31.0, float("nan")], + }, + index=pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]), + ) + saved_close = pd.read_csv(Path(tmpdir) / "us.csv", index_col=0, parse_dates=True) + + pd.testing.assert_frame_equal(panels["close"], expected, check_freq=False) + pd.testing.assert_frame_equal(saved_close, expected, check_freq=False) + + def test_update_market_data_volume_merge_can_clear_stale_cached_values(self): + existing_dates = pd.to_datetime(["2024-01-01", "2024-01-02"]) + new_dates = pd.to_datetime(["2024-01-02", "2024-01-03", "2024-01-04"]) + existing_volume = pd.DataFrame( + { + "AAA": [1000.0, 9999.0], + "CCC": [3000.0, 3100.0], + }, + index=existing_dates, + ) + downloaded_volume = pd.DataFrame({"Volume": [float("nan"), 1200.0, 1300.0]}, index=new_dates) + + with tempfile.TemporaryDirectory() as tmpdir: + existing_volume.to_csv(Path(tmpdir) / "us_volume.csv") + with mock.patch.object(data_manager, "DATA_DIR", tmpdir): + with mock.patch("data_manager.yf.download", return_value=downloaded_volume): + panels = data_manager.update_market_data("us", ["AAA"], ["volume"]) + + expected = pd.DataFrame( + { + "AAA": [1000.0, float("nan"), 1200.0, 1300.0], + "CCC": [3000.0, 3100.0, float("nan"), float("nan")], + }, + index=pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"]), + ) + saved_volume = pd.read_csv(Path(tmpdir) / "us_volume.csv", index_col=0, parse_dates=True) + + pd.testing.assert_frame_equal(panels["volume"], expected, check_freq=False) + pd.testing.assert_frame_equal(saved_volume, expected, check_freq=False) + + def test_update_market_data_handles_single_ticker_multiindex_download(self): + dates = pd.to_datetime(["2024-01-02", "2024-01-03"]) + raw = pd.DataFrame( + { + ("Close", "AAA"): [10.0, 11.0], + ("Volume", "AAA"): [1000.0, 1100.0], + }, + index=dates, + ) + raw.columns = pd.MultiIndex.from_tuples(raw.columns) + + with tempfile.TemporaryDirectory() as tmpdir: + with mock.patch.object(data_manager, "DATA_DIR", tmpdir): + with mock.patch("data_manager.yf.download", return_value=raw): + panels = data_manager.update_market_data("us", ["AAA"], ["close", "volume"]) + + expected_close = pd.DataFrame({"AAA": [10.0, 11.0]}, index=dates) + expected_volume = pd.DataFrame({"AAA": [1000.0, 1100.0]}, index=dates) + pd.testing.assert_frame_equal(panels["close"], expected_close, check_freq=False) + pd.testing.assert_frame_equal(panels["volume"], expected_volume, check_freq=False) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_us_alpha_pipeline.py b/tests/test_us_alpha_pipeline.py new file mode 100644 index 0000000..754114f --- /dev/null +++ b/tests/test_us_alpha_pipeline.py @@ -0,0 +1,164 @@ +import unittest +from pathlib import Path +from unittest import mock + +import pandas as pd + + +class USAlphaPipelineTests(unittest.TestCase): + def test_build_equal_weight_portfolio_caps_holdings_under_ties(self): + from research.us_alpha_pipeline import _build_equal_weight_portfolio + + dates = pd.date_range("2024-01-01", periods=2, freq="D") + score = pd.DataFrame( + { + "AAA": [0.9, 0.9], + "BBB": [0.9, 0.9], + "CCC": [0.9, 0.9], + }, + index=dates, + ) + tradable_mask = pd.DataFrame(True, index=dates, columns=score.columns) + regime = pd.Series([True, True], index=dates) + + weights = _build_equal_weight_portfolio(score, tradable_mask, regime, top_n=2) + + self.assertEqual(int((weights.iloc[-1] > 0).sum()), 2) + self.assertAlmostEqual(float(weights.iloc[-1].sum()), 1.0) + + def test_equity_curve_uses_prior_day_weights_for_returns(self): + from research.us_alpha_pipeline import _equity_curve + + dates = pd.date_range("2024-01-01", periods=3, freq="D") + close = pd.DataFrame({"AAA": [1.0, 2.0, 4.0]}, index=dates) + weights = pd.DataFrame({"AAA": [0.0, 1.0, 0.0]}, index=dates) + + equity = _equity_curve(close, weights) + + self.assertEqual(float(equity.iloc[1]), 2.0) + self.assertEqual(float(equity.iloc[2]), 2.0) + + def test_summarize_equity_window_returns_nans_when_history_is_too_short(self): + from research.us_alpha_report import summarize_equity_window + + dates = pd.date_range("2024-01-01", periods=10, freq="D") + equity = pd.Series([1.0 + 0.01 * i for i in range(10)], index=dates) + + summary = summarize_equity_window(equity, "demo", window_years=1) + + self.assertTrue(pd.isna(summary["CAGR"])) + self.assertTrue(pd.isna(summary["Sharpe"])) + self.assertTrue(pd.isna(summary["MaxDD"])) + self.assertTrue(pd.isna(summary["TotalRet"])) + + def test_run_alpha_pipeline_returns_expected_strategy_summary(self): + from research.us_alpha_pipeline import run_alpha_pipeline + + dates = pd.date_range("2023-01-01", periods=400, freq="D") + + aaa_close = [50.0 + 0.20 * i for i in range(400)] + bbb_close = [55.0 + 0.12 * i for i in range(400)] + ccc_close = [60.0 + 0.05 * i for i in range(400)] + close = pd.DataFrame( + { + "AAA": aaa_close, + "BBB": bbb_close, + "CCC": ccc_close, + }, + index=dates, + ) + high = pd.DataFrame( + { + "AAA": [value + 0.5 for value in aaa_close], + "BBB": [value + 1.0 for value in bbb_close], + "CCC": [value + 1.5 for value in ccc_close], + }, + index=dates, + ) + low = pd.DataFrame( + { + "AAA": [value - 0.5 for value in aaa_close], + "BBB": [value - 1.0 for value in bbb_close], + "CCC": [value - 1.5 for value in ccc_close], + }, + index=dates, + ) + volume = pd.DataFrame( + { + "AAA": [1_500_000.0] * 400, + "BBB": [1_400_000.0] * 400, + "CCC": [1_300_000.0] * 400, + }, + index=dates, + ) + volume.loc[dates[-2], "AAA"] = 4_000_000.0 + + etf_close = pd.DataFrame( + { + "SPY": [300.0 + 0.8 * i for i in range(400)], + "QQQ": [280.0 + 1.1 * i for i in range(400)], + "XLF": [200.0 + 0.4 * i for i in range(400)], + }, + index=dates, + ) + + market_data = { + "close": close, + "high": high, + "low": low, + "volume": volume, + } + + summary = run_alpha_pipeline( + market_data=market_data, + etf_close=etf_close, + pit_membership=None, + windows=(1,), + top_n=2, + ) + + required_columns = {"strategy", "window_years", "CAGR", "Sharpe", "MaxDD", "TotalRet"} + self.assertTrue(required_columns.issubset(summary.columns)) + self.assertEqual(set(summary["strategy"]), {"breakout_regime", "rank_blend_regime"}) + self.assertEqual(set(summary["window_years"]), {1}) + self.assertEqual(len(summary), 2) + self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all()) + + def test_run_saved_pit_alpha_pipeline_reads_saved_inputs(self): + from research.us_alpha_pipeline import run_saved_pit_alpha_pipeline + + dates = pd.date_range("2024-01-01", periods=320, freq="D") + close = pd.DataFrame( + { + "AAA": [50.0 + 0.2 * i for i in range(320)], + "BBB": [40.0 + 0.1 * i for i in range(320)], + }, + index=dates, + ) + high = close + 1.0 + low = close - 1.0 + volume = pd.DataFrame({"AAA": [2_500_000.0] * 320, "BBB": [2_000_000.0] * 320}, index=dates) + etf_close = pd.DataFrame( + {"SPY": [300.0 + 0.8 * i for i in range(320)], "QQQ": [280.0 + 1.1 * i for i in range(320)]}, + index=dates, + ) + + with self.subTest("saved_inputs"): + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + close.to_csv(Path(tmpdir) / "us_pit_close.csv") + high.to_csv(Path(tmpdir) / "us_pit_high.csv") + low.to_csv(Path(tmpdir) / "us_pit_low.csv") + volume.to_csv(Path(tmpdir) / "us_pit_volume.csv") + etf_close.to_csv(Path(tmpdir) / "us_etf.csv") + + intervals = {"AAA": [[None, None]], "BBB": [[None, None]]} + with mock.patch("research.us_alpha_pipeline.uh.load_sp500_history", return_value=intervals): + summary = run_saved_pit_alpha_pipeline(data_dir=tmpdir, windows=(1,), top_n=1) + + self.assertEqual(set(summary["strategy"]), {"breakout_regime", "rank_blend_regime"}) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_us_universe.py b/tests/test_us_universe.py new file mode 100644 index 0000000..983a789 --- /dev/null +++ b/tests/test_us_universe.py @@ -0,0 +1,213 @@ +import unittest +import warnings + +import pandas as pd + + +class BuildTradableMaskTests(unittest.TestCase): + def test_build_tradable_mask_uses_only_lagged_price_and_liquidity_inputs(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=4, freq="D") + close = pd.DataFrame({"AAA": [4.0, 10.0, 10.0, 10.0]}, index=dates) + volume = pd.DataFrame({"AAA": [float("nan"), 200.0, 200.0, 200.0]}, index=dates) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=None, + min_price=5.0, + min_dollar_volume=1000.0, + min_history_days=2, + min_valid_volume_days=2, + liquidity_window=2, + ) + + expected = pd.DataFrame({"AAA": [False, False, False, True]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_uses_only_lagged_history(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=4, freq="D") + close = pd.DataFrame({"AAA": [10.0, float("nan"), 10.0, 10.0]}, index=dates) + volume = pd.DataFrame({"AAA": [200.0, 200.0, 200.0, 200.0]}, index=dates) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=None, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=2, + min_valid_volume_days=1, + liquidity_window=1, + ) + + expected = pd.DataFrame({"AAA": [False, False, False, False]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_requires_membership_history_before_first_eligible_day(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=4, freq="D") + close = pd.DataFrame({"AAA": [10.0, 10.0, 10.0, 10.0]}, index=dates) + volume = pd.DataFrame({"AAA": [200.0, 200.0, 200.0, 200.0]}, index=dates) + pit_membership = pd.DataFrame({"AAA": [False, False, True, True]}, index=dates) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=pit_membership, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=1, + min_valid_volume_days=1, + liquidity_window=1, + ) + + expected = pd.DataFrame({"AAA": [False, False, False, True]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_aligns_pit_membership_without_truthy_carryover(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=3, freq="D") + close = pd.DataFrame( + { + "AAA": [10.0, 10.0, 10.0], + "BBB": [12.0, 12.0, 12.0], + }, + index=dates, + ) + volume = pd.DataFrame( + { + "AAA": [1_000_000.0, 1_000_000.0, 1_000_000.0], + "BBB": [1_000_000.0, 1_000_000.0, 1_000_000.0], + }, + index=dates, + ) + pit_membership = pd.DataFrame( + { + "BBB": [True, True, False], + "CCC": [True, True, True], + }, + index=pd.date_range("2024-01-02", periods=3, freq="D"), + ) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=pit_membership, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=1, + min_valid_volume_days=1, + liquidity_window=1, + ) + + self.assertEqual(len(caught), 0) + expected = pd.DataFrame( + { + "AAA": [False, False, False], + "BBB": [False, False, True], + }, + index=dates, + dtype=bool, + ) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_treats_missing_membership_cells_as_false(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=3, freq="D") + close = pd.DataFrame({"AAA": [10.0, 10.0, 10.0]}, index=dates) + volume = pd.DataFrame({"AAA": [1_000_000.0, 1_000_000.0, 1_000_000.0]}, index=dates) + pit_membership = pd.DataFrame( + {"AAA": [True, pd.NA, True]}, + index=dates, + dtype="boolean", + ) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=pit_membership, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=1, + min_valid_volume_days=1, + liquidity_window=1, + ) + + expected = pd.DataFrame({"AAA": [False, False, False]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_uses_strict_thresholds(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=3, freq="D") + close = pd.DataFrame({"AAA": [5.0, 5.0, 5.0]}, index=dates) + volume = pd.DataFrame({"AAA": [300.0, 300.0, 300.0]}, index=dates) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=None, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=1, + min_valid_volume_days=1, + liquidity_window=1, + ) + + expected = pd.DataFrame({"AAA": [False, False, False]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_uses_strict_dollar_volume_threshold(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=3, freq="D") + close = pd.DataFrame({"AAA": [8.0, 8.0, 8.0]}, index=dates) + volume = pd.DataFrame({"AAA": [125.0, 125.0, 125.0]}, index=dates) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=None, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=1, + min_valid_volume_days=1, + liquidity_window=1, + ) + + expected = pd.DataFrame({"AAA": [False, False, False]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + def test_build_tradable_mask_requires_valid_dollar_volume_history(self): + from research.us_universe import build_tradable_mask + + dates = pd.date_range("2024-01-01", periods=4, freq="D") + close = pd.DataFrame({"AAA": [10.0, float("nan"), 10.0, 10.0]}, index=dates) + volume = pd.DataFrame({"AAA": [200.0, 200.0, 200.0, 200.0]}, index=dates) + + mask = build_tradable_mask( + close=close, + volume=volume, + pit_membership=None, + min_price=5.0, + min_dollar_volume=1_000.0, + min_history_days=1, + min_valid_volume_days=2, + liquidity_window=2, + ) + + expected = pd.DataFrame({"AAA": [False, False, False, False]}, index=dates, dtype=bool) + pd.testing.assert_frame_equal(mask, expected) + + +if __name__ == "__main__": + unittest.main()