Compare commits

...

11 Commits

Author SHA1 Message Date
cdaca4bc2a Merge branch 'feat/us-alpha-phase1' 2026-04-18 15:00:56 +08:00
f5e8c708f3 feat: add PIT OHLCV runner and fetch support 2026-04-18 14:59:48 +08:00
c015873ee1 feat: add strict US alpha research pipeline 2026-04-18 00:38:29 +08:00
bf6fccfd11 feat: add regime and breakout alpha modules 2026-04-18 00:31:16 +08:00
7853eafe55 feat: add PIT-aware tradable universe mask 2026-04-18 00:23:07 +08:00
1edce83430 fix: handle single-ticker yahoo panels 2026-04-18 00:03:07 +08:00
3abc51e3e3 feat: add OHLCV market data updater 2026-04-17 23:59:06 +08:00
7239310be3 docs: add US alpha research design spec 2026-04-17 23:41:10 +08:00
5e1c4a681d Add point-in-time S&P 500 backtest to expose survivorship bias
The existing framework fetches today's S&P 500 constituents from Wikipedia
and applies that list to the entire 10-year price history — classic
survivorship bias. Stocks that went bankrupt or were removed for poor
performance are absent, while today's winners (which may have been minor
names 10 years ago) are implicitly selected. This materially inflates
reported strategy returns.

New pipeline:
  - universe_history.py reconstructs per-ticker membership intervals by
    walking Wikipedia's "Selected changes" table backward from today.
  - research/fetch_historical.py downloads prices for all 848 tickers
    that were ever members (Yahoo returns ~675 of them; ~170 fully
    delisted names are unavailable — remaining partial bias).
  - research/pit_backtest.py masks prices to NaN outside membership
    windows so strategies naturally cannot select non-members.
  - research/strategies_plus.py adds RecoveryMomentumPlus (generalized
    Recovery+Momentum with configurable weighting / blend / regime hook)
    and an EnsembleStrategy.
  - research/optimize.py runs five experiments: bias drift, hyperparameter
    sweep (2016-2022 train / 2023-2026 test), SPY MA regime filter,
    weighting schemes, and an uncorrelated-config ensemble.

Headline finding: the biased backtest reports 40.9% CAGR for
recovery_mom_top10 over 2016-2026; the point-in-time version reports
22.4% (vs 14.0% SPY buy-and-hold). True edge is ~8pp CAGR, not ~27pp.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:26:02 +08:00
2015b62104 Charge 5 CNY per A-share trade via per-market fee table
Add MARKET_FEES {us: 2, cn: 5} so the monitor and cron (auto) paths
automatically apply the correct local-currency fixed commission without
needing a per-strategy override. CLI --fixed-fee still wins when set
explicitly for auto; monitor now always resolves from the table so its
banner and each strategy sub-call agree.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-11 13:32:41 +08:00
b2176b0c3e Record daily snapshot in cmd_evening for monitor NAV tracking
cmd_evening (used by the monitor path) only updated the simple daily_equity
dict, so daily_log had gaps on every monitor-driven day. Mirror cmd_auto's
pattern and call record_daily_snapshot so each strategy's NAV is recorded
every trading day, even when no trades execute.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-11 13:15:09 +08:00
20 changed files with 2412 additions and 8 deletions

1
data/sp500_history.json Normal file

File diff suppressed because one or more lines are too long

View File

@@ -63,10 +63,11 @@ def _download(tickers: list[str], start: str, end: str | None = None,
result = {} result = {}
for field in fields: for field in fields:
if field in raw.columns.get_level_values(0) if isinstance(raw.columns, pd.MultiIndex) else field in raw.columns: if field in raw.columns.get_level_values(0) if isinstance(raw.columns, pd.MultiIndex) else field in raw.columns:
if len(tickers) > 1: selected = raw[field]
result[field] = raw[field] if isinstance(selected, pd.Series):
result[field] = selected.to_frame(name=tickers[0])
else: else:
result[field] = raw[field].to_frame(name=tickers[0]) result[field] = selected
else: else:
result[field] = pd.DataFrame() result[field] = pd.DataFrame()
return result return result
@@ -83,10 +84,11 @@ def _download_period(tickers: list[str], period: str,
result = {} result = {}
for field in fields: for field in fields:
if field in raw.columns.get_level_values(0) if isinstance(raw.columns, pd.MultiIndex) else field in raw.columns: if field in raw.columns.get_level_values(0) if isinstance(raw.columns, pd.MultiIndex) else field in raw.columns:
if len(tickers) > 1: selected = raw[field]
result[field] = raw[field] if isinstance(selected, pd.Series):
result[field] = selected.to_frame(name=tickers[0])
else: else:
result[field] = raw[field].to_frame(name=tickers[0]) result[field] = selected
else: else:
result[field] = pd.DataFrame() result[field] = pd.DataFrame()
return result return result
@@ -103,6 +105,66 @@ def _clean(data: pd.DataFrame) -> pd.DataFrame:
return data return data
def _clean_market_data(data: pd.DataFrame, field: str) -> pd.DataFrame:
"""Clean market data while preserving volume gaps."""
good = data.columns[data.notna().mean() > 0.5]
dropped = set(data.columns) - set(good)
if dropped:
print(f"--- Dropped {len(dropped)} tickers with >50% missing data ---")
data = data[good]
if field == "volume":
return data
return data.ffill().dropna(how="all")
def _merge_market_panel(existing: pd.DataFrame | None, new_data: pd.DataFrame) -> pd.DataFrame:
"""Merge new data into an existing cached panel, preserving old columns and dates."""
if existing is None or existing.empty:
merged = new_data.copy()
elif new_data.empty:
merged = existing.copy()
else:
merged = existing.combine_first(new_data)
merged.loc[new_data.index, new_data.columns] = new_data
merged = merged.sort_index()
merged = merged[~merged.index.duplicated(keep="last")]
return merged
def update_market_data(market: str, tickers: list[str], fields: list[str]) -> dict[str, pd.DataFrame]:
"""Download, clean, persist, and return market data panels for requested Yahoo fields."""
field_aliases = {
"close": "Close",
"open": "Open",
"high": "High",
"low": "Low",
"volume": "Volume",
}
normalized_fields = []
yahoo_fields = []
for field in fields:
normalized = field.lower()
if normalized not in field_aliases:
raise ValueError(f"Unsupported market data field: {field}")
normalized_fields.append(normalized)
yahoo_fields.append(field_aliases[normalized])
os.makedirs(DATA_DIR, exist_ok=True)
start = (datetime.now() - timedelta(days=365 * 10)).strftime("%Y-%m-%d")
downloaded = _download(tickers, start=start, fields=yahoo_fields)
cleaned = {}
for normalized, yahoo_field in zip(normalized_fields, yahoo_fields):
data = _clean_market_data(downloaded.get(yahoo_field, pd.DataFrame()), normalized)
existing = load(market, normalized)
data = _merge_market_panel(existing, data)
path = _data_path(market, normalized)
data.to_csv(path)
print(f"--- Saved {data.shape[0]} days x {data.shape[1]} tickers to {path} ---")
cleaned[normalized] = data
return cleaned
def update(market: str, tickers: list[str], def update(market: str, tickers: list[str],
with_open: bool = False) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]: with_open: bool = False) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]:
""" """

View File

@@ -0,0 +1,376 @@
# US High-Alpha Research Design
**Date:** 2026-04-17
## Goal
Build a research framework for US `long-only` equity strategies that uses only free or already-accessible data, avoids lookahead and survivorship traps as much as the available data allows, and can rank candidate strategy families over `1/2/3/5/10y` windows. The objective is not to manufacture the single highest backtest CAGR, but to identify strategy families whose alpha survives realistic liquidity filters, transaction costs, and point-in-time constraints.
## Constraints
- Data sources must be free or already accessible from the current project environment.
- Portfolio construction must be `long-only`.
- The US research universe may extend beyond the S&P 500 into a broader US stock pool, but all conclusions must clearly distinguish between:
- `strict` results from a point-in-time-clean universe.
- `exploratory` results from a wider free-data universe that is not fully point-in-time-clean.
- All signals must use only information available at the time of decision.
- The framework must explicitly guard against:
- survivorship bias
- lookahead bias
- static industry-label leakage
- microcap and illiquidity contamination
## Success Criteria
The framework is successful if it produces:
1. A unified research and backtest pipeline for US strategies.
2. A ranked comparison of `3-5` high-value strategy families across `1/2/3/5/10y`.
3. Metrics that go beyond headline CAGR, including:
- `CAGR`
- `Sharpe`
- `Sortino`
- `MaxDD`
- `Calmar`
- `Turnover`
- `Average positions`
- `Median ADV usage`
- `Subperiod stability`
4. Tiered interpretation of results:
- `Tier A`: realistic and tradable under tighter liquidity assumptions
- `Tier B`: strong alpha but lower capacity
- `Tier C`: attractive only under loose assumptions and not suitable as a production candidate
Any strategy that reports near-`50% CAGR` must also explain:
- which market regime contributed most of the return
- whether performance depends on low-liquidity or small-cap tails
- whether results survive after removing the most extreme tail names
## Research Philosophy
This project should prefer honest, repeatable alpha discovery over spectacular but fragile backtests. Under the current constraints, a `10y 50% CAGR` should be treated as an upper-end outcome that may appear in selective windows, not as a baseline expectation. The more realistic goal is to find strategies that are strong over `3/5y`, still meaningfully outperform over `10y`, and remain robust after tightening assumptions.
## Strategy Families
The research effort will focus on four strategy families.
### 1. Earnings Drift Proxy
Target the post-information-repricing phase after major company-specific events. This is conceptually the highest-alpha family, but also the most dependent on event data quality.
Primary implementation order:
- use free historical earnings date data if it is stable enough
- otherwise fall back to price-and-volume-defined event proxies
Core signal ingredients:
- strong post-event excess return over `1-3` days
- abnormal volume
- gap that does not immediately fill
- price holding near short- and medium-term highs after the event
### 2. Breakout After Compression
Target stocks that transition from low-volatility congestion into sustained trend expansion. This is the cleanest strategy family to implement with free daily OHLCV data and is the best first candidate for a strict production-grade pipeline.
Core signal ingredients:
- proximity to `120d` or `252d` highs
- volatility compression over the prior `20-40` trading days
- rising dollar volume
- positive relative strength versus market and industry proxies
### 3. Gap-and-Go / High-Volume Continuation
Target the second phase of move continuation after abnormal return and volume shocks rather than blindly chasing the first event day.
Core signal ingredients:
- abnormal `1d` or `3d` return
- abnormal volume versus trailing `60d`
- post-event price holding above the event anchor
- subsequent breakout continuation
This family has high potential upside but is more sensitive to cost assumptions and market regime.
### 4. Regime-Gated Cross-Sectional Alpha
Use broad market and industry-state filters to improve the hit rate of the other strategy families and provide a lower-volatility baseline alpha engine.
Core signal ingredients:
- market risk-on versus risk-off state
- industry ETF leadership
- relative strength
- recovery from drawdowns
- trend quality
- near-`52w` high behavior
- price/volume confirmation
This family is not expected to produce the highest standalone CAGR, but it is expected to improve robustness and reduce participation in hostile environments.
## Prioritization
Recommended implementation order:
1. `Breakout After Compression`
2. `Regime-Gated Cross-Sectional Alpha`
3. `Gap-and-Go / High-Volume Continuation`
4. `Earnings Drift Proxy` only after validating free event-data quality
Rationale:
- `Breakout After Compression` is the most implementable and least ambiguous with free data.
- `Regime-Gated Cross-Sectional Alpha` provides a shared control layer for the rest of the framework.
- `Gap-and-Go` has higher upside but also higher sensitivity to assumptions.
- `Earnings Drift Proxy` is theoretically powerful but should not become the project bottleneck if free event history is incomplete.
## Data Layer
The framework needs a richer data layer than the current `close/open` setup.
### Required price fields
Daily US market data should support at least:
- `open`
- `high`
- `low`
- `close`
- `volume`
This is required to define:
- real breakouts
- gap events
- volatility compression
- abnormal dollar volume
### Required ETF layer
Add stable market and industry ETFs for regime and leadership analysis, at minimum:
- `SPY`
- `QQQ`
- `IWM`
- `MDY`
- `XLF`
- `XLK`
- `XLI`
- `XLV`
- `XLY`
- `XLP`
- `XLE`
- `XLU`
- `XLRE`
- `XLB`
- `SOXX`
- `IGV`
- `SMH`
### Universe modes
The framework must support two explicit modes.
#### Strict mode
Use point-in-time-clean universe membership, initially based on the existing PIT S&P 500 machinery in the repository. This is the baseline for formal, defensible results.
#### Exploratory mode
Use a wider free-data US stock pool to search for stronger alpha patterns. These results are useful for idea generation but must be labeled as exploratory unless later promoted into a point-in-time-clean setup.
## Universe Construction Rules
The tradable universe must be computed daily from lagged information.
### Daily eligibility rules
Each stock may enter the candidate set only if all required conditions hold as of `t-1`:
- enough listing history exists to compute the strategy lookbacks
- enough valid volume observations exist
- minimum lagged price threshold is met
- minimum lagged dollar-volume threshold is met
Representative defaults:
- `close[t-1] > 5`
- `median_dollar_volume_60d[t-1] > $20M` in `strict` mode
- `median_dollar_volume_60d[t-1] > $5M` in `exploratory` mode
- `>= 252` valid trading days before eligibility
- `>= 40` valid volume days in the trailing `60d`
Thresholds should be strategy-specific and tunable in robustness sweeps.
### Industry mapping
Do not use today's static sector labels to explain historical behavior. For historical regime and industry alignment, prefer PIT-safe proxies such as rolling correlation or beta to industry ETFs over `63/126d` windows.
## Anti-Lookahead Rules
The framework must enforce the following rules consistently.
1. Signals computed using `t` daily bars may only be traded no earlier than `t+1`.
2. If an event is effectively published after market close, it becomes tradable no earlier than the next trading day after publication.
3. Rolling inputs for liquidity, volatility, and breakout logic must use complete lagged windows with explicit timing semantics.
4. Cross-sectional ranking must happen only within the daily eligible universe.
5. Universe membership, filters, and factor normalization must be applied before portfolio selection, not after.
## Execution Convention
Default execution convention:
- observe data through `t` close
- compute signal after the `t` close
- trade at `t+1`
The framework may compare `t+1 open` and `t+1 close` execution variants if the data path supports both, but the default research baseline should be conservative and consistent.
## Backtest and Evaluation Framework
Every strategy family must run through a single pipeline that:
1. loads required market data
2. constructs the daily eligible universe
3. computes regime filters
4. computes strategy scores or event states
5. builds a `long-only` portfolio
6. applies transaction costs
7. reports `1/2/3/5/10y` windows
8. records robustness diagnostics
### Portfolio defaults
Initial baseline settings:
- `long-only`
- concentrated books such as `top 5`, `top 10`, `top 20`
- start with `equal weight`
- add `inverse-vol` weighting only as a secondary comparison
Equal-weight concentrated portfolios should be the first baseline because they are harder to over-engineer than adaptive weighting schemes.
### Required robustness checks
Any strategy candidate that looks strong must automatically be re-run under:
- tighter liquidity thresholds
- fewer and more positions
- higher trading costs
- different rebalance frequencies
- exclusion of the lowest-liquidity or smallest-cap tail
Only strategies that survive these perturbations should be promoted to `Tier A`.
## Repository Changes
The following repository changes are required.
### New modules
#### `research/us_universe.py`
Responsibilities:
- build daily tradable-universe masks
- support `strict` and `exploratory` modes
- enforce lagged eligibility rules
#### `data_manager.py` extension or new `market_data.py`
Responsibilities:
- support daily US `OHLCV`
- support ETF data updates
- preserve existing price-loading workflows where practical
#### `research/regime_filters.py`
Responsibilities:
- market risk-on/risk-off filters
- ETF leadership signals
- breadth and relative-strength helpers
#### `research/event_factors.py`
Responsibilities:
- breakout-compression scores
- gap-continuation scores
- high-volume continuation logic
- earnings-drift proxy logic
#### `research/us_alpha_pipeline.py`
Responsibilities:
- orchestrate end-to-end research runs
- load data
- build universe masks
- run strategy families
- produce windowed rankings
- label output as `strict` or `exploratory`
#### `research/us_alpha_report.py`
Responsibilities:
- format tables and CSV outputs
- summarize results by family and horizon
- support markdown export if needed
## Research Phasing
The implementation should be split into two phases.
### Phase 1
Build the strict, defensible research backbone:
- PIT S&P 500 universe
- OHLCV data support
- ETF regime filters
- `Breakout After Compression`
- `Regime-Gated Cross-Sectional Alpha`
- `Gap-and-Go / High-Volume Continuation`
- unified backtest and reporting pipeline
This phase should produce a clean research system that is difficult to fool with future information.
### Phase 2
Expand into higher-upside exploratory research:
- wider US stock universe
- broader signal scanning
- stronger CAGR search
- explicit exploratory labeling
This phase is for alpha discovery, not for making final claims about unbiased production performance.
## Recommended Output
The finished framework should produce:
- a repeatable research entrypoint for US alpha studies
- CSV outputs for `1/2/3/5/10y` windows
- a ranked table of strategy families
- tier classification for candidates
- notes on where near-`50% CAGR` outcomes come from and whether they remain credible after tightening assumptions
## Non-Goals
This project does not aim to:
- promise stable `10y 50% CAGR`
- claim a fully point-in-time-clean all-US-stock universe from free data alone
- optimize to a single headline metric at the expense of realism
- treat exploratory full-market scans as production-quality evidence
## Key Decision
The core design choice is to build infrastructure that minimizes self-deception first, and only then search for extreme CAGR outcomes. Any other order is likely to produce attractive but unreliable results.

0
research/__init__.py Normal file
View File

34
research/event_factors.py Normal file
View File

@@ -0,0 +1,34 @@
import numpy as np
import pandas as pd
TRAILING_HIGH_WINDOW = 60
COMPRESSION_WINDOW = 20
VOLUME_WINDOW = 20
def breakout_after_compression_score(
close: pd.DataFrame,
high: pd.DataFrame,
low: pd.DataFrame,
volume: pd.DataFrame,
) -> pd.DataFrame:
"""Score breakout setups and shift the result so it is tradable next day."""
close = close.sort_index()
high = high.reindex(index=close.index, columns=close.columns).sort_index()
low = low.reindex(index=close.index, columns=close.columns).sort_index()
volume = volume.reindex(index=close.index, columns=close.columns).sort_index()
trailing_high = close.rolling(TRAILING_HIGH_WINDOW, min_periods=TRAILING_HIGH_WINDOW).max()
proximity_to_high = close / trailing_high.replace(0, np.nan)
recent_high = high.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).max()
recent_low = low.rolling(COMPRESSION_WINDOW, min_periods=COMPRESSION_WINDOW).min()
recent_mid = (recent_high + recent_low) / 2
compressed_range = -((recent_high - recent_low) / recent_mid.replace(0, np.nan))
median_volume = volume.rolling(VOLUME_WINDOW, min_periods=VOLUME_WINDOW).median()
abnormal_volume = volume / median_volume.replace(0, np.nan)
score = proximity_to_high + compressed_range + abnormal_volume
return score.shift(1)

View File

@@ -0,0 +1,152 @@
"""
Fetch price history for all tickers that were ever S&P 500 members — including
delisted ones — and save to data/us_pit.csv. This is the foundation for a
survivorship-bias-free backtest.
NOTE: Yahoo Finance no longer serves price data for many fully-delisted tickers
(bankruptcies, old mergers). Those are silently skipped. The result is still
a major improvement over "today's S&P 500 extrapolated 10 years back", but it
is NOT a perfect point-in-time dataset — only a dataset where the universe
mask is correct at each date. A subset of worst-outcome tickers (e.g., ABK,
ACAS) will be missing entirely. This caveat is documented in the run summary.
"""
import os
from datetime import datetime, timedelta
import pandas as pd
import yfinance as yf
import universe_history as uh
DATA_DIR = "data"
OUT_PATH = os.path.join(DATA_DIR, "us_pit.csv")
YEARS = 10
BATCH_SIZE = 50
def _field_out_paths() -> dict[str, str]:
return {
"Close": os.path.join(DATA_DIR, "us_pit_close.csv"),
"High": os.path.join(DATA_DIR, "us_pit_high.csv"),
"Low": os.path.join(DATA_DIR, "us_pit_low.csv"),
"Volume": os.path.join(DATA_DIR, "us_pit_volume.csv"),
}
def fetch_all_historical(force: bool = False) -> pd.DataFrame:
os.makedirs(DATA_DIR, exist_ok=True)
intervals = uh.load_sp500_history()
tickers = uh.all_tickers_ever(intervals) + ["SPY"]
tickers = sorted(set(tickers))
existing = None
if os.path.exists(OUT_PATH) and not force:
existing = pd.read_csv(OUT_PATH, index_col=0, parse_dates=True)
missing = [t for t in tickers if t not in existing.columns]
if not missing:
# Just append latest dates
last_date = existing.index[-1]
if (datetime.now() - last_date.to_pydatetime()).days < 2:
print(f"--- us_pit.csv already up to date: {existing.shape} ---")
return existing
tickers = list(existing.columns)
start = (last_date + timedelta(days=1)).strftime("%Y-%m-%d")
print(f"--- Appending new dates from {start} for {len(tickers)} tickers ---")
new = _download_batched(tickers, start=start)
if new is not None and not new.empty:
combined = pd.concat([existing, new]).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
combined.to_csv(OUT_PATH)
print(f"--- Saved {combined.shape} to {OUT_PATH} ---")
return combined
return existing
else:
print(f"--- Have {existing.shape[1]} cols; need {len(missing)} more ---")
tickers = missing
start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d")
new = _download_batched(tickers, start=start)
if existing is not None and new is not None and not new.empty:
combined = pd.concat([existing, new.reindex(existing.index)], axis=1)
# Add any new rows from `new` not in existing
new_only_idx = new.index.difference(existing.index)
if len(new_only_idx) > 0:
combined_new = new.loc[new_only_idx].reindex(columns=combined.columns)
combined = pd.concat([combined, combined_new]).sort_index()
else:
combined = new
combined.to_csv(OUT_PATH)
print(f"--- Saved {combined.shape} to {OUT_PATH} ---")
return combined
def fetch_all_historical_ohlcv(force: bool = False) -> dict[str, pd.DataFrame]:
os.makedirs(DATA_DIR, exist_ok=True)
intervals = uh.load_sp500_history()
tickers = uh.all_tickers_ever(intervals) + ["SPY"]
tickers = sorted(set(tickers))
start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d")
panels = _download_batched_fields(tickers, start=start, fields=["Close", "High", "Low", "Volume"])
if not panels:
raise RuntimeError("No PIT OHLCV data downloaded")
close = panels["Close"]
close.to_csv(OUT_PATH)
print(f"--- Saved {close.shape} to {OUT_PATH} ---")
result: dict[str, pd.DataFrame] = {"close": close}
for field, path in _field_out_paths().items():
panel = panels[field]
panel.to_csv(path)
print(f"--- Saved {panel.shape} to {path} ---")
result[field.lower()] = panel
return result
def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None:
panels = _download_batched_fields(tickers, start=start, fields=["Close"])
if not panels:
return None
return panels["Close"]
def _download_batched_fields(
tickers: list[str],
start: str,
fields: list[str],
) -> dict[str, pd.DataFrame]:
frames = {field: [] for field in fields}
n = len(tickers)
for i in range(0, n, BATCH_SIZE):
batch = tickers[i:i + BATCH_SIZE]
print(f" [{i}/{n}] fetching {len(batch)} tickers...", flush=True)
try:
raw = yf.download(batch, start=start, auto_adjust=True,
progress=False, threads=True)
if raw.empty:
continue
for field in fields:
if isinstance(raw.columns, pd.MultiIndex):
panel = raw[field]
else:
panel = raw[[field]].rename(columns={field: batch[0]})
panel = panel.dropna(axis=1, how="all")
if not panel.empty:
frames[field].append(panel)
except Exception as e:
print(f" batch failed: {e}")
result = {}
for field, field_frames in frames.items():
if field_frames:
panel = pd.concat(field_frames, axis=1).sort_index()
panel = panel.loc[:, ~panel.columns.duplicated()]
result[field] = panel
else:
result[field] = pd.DataFrame()
return result
if __name__ == "__main__":
fetch_all_historical()

299
research/optimize.py Normal file
View File

@@ -0,0 +1,299 @@
"""
End-to-end optimization study for the US recovery+momentum strategy family,
run on a point-in-time (survivorship-bias-mitigated) S&P 500 universe.
Experiments:
E1 — Baseline drift: biased vs point-in-time universe, current top10 params.
E2 — Hyperparameter sweep with 2016-2022 train / 2023-2026 test split.
E3 — SPY MA200 regime filter (compare base vs filtered).
E4 — Weighting schemes: equal vs inverse-vol vs rank.
E5 — Ensemble of top-3 uncorrelated configs.
Usage: uv run python -m research.optimize
"""
import os
import numpy as np
import pandas as pd
import data_manager
import research.pit_backtest as pit
from research.strategies_plus import (EnsembleStrategy, RecoveryMomentumPlus,
spy_ma200_filter)
from strategies.recovery_momentum import RecoveryMomentumStrategy
DATA_DIR = "data"
BIASED_CSV = os.path.join(DATA_DIR, "us.csv")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def slice_period(df: pd.DataFrame, start: str | None, end: str | None) -> pd.DataFrame:
out = df
if start:
out = out[out.index >= start]
if end:
out = out[out.index <= end]
return out
def run_strategy(strategy, prices, benchmark=None, regime_filter=None,
fixed_fee: float = 0.0) -> pd.Series:
return pit.backtest(
strategy=strategy, prices=prices, initial_capital=10_000,
transaction_cost=0.001, fixed_fee=fixed_fee,
benchmark=benchmark, regime_filter=regime_filter,
)
# ---------------------------------------------------------------------------
# Experiment 1: bias drift
# ---------------------------------------------------------------------------
def exp1_bias_drift(pit_prices_masked: pd.DataFrame) -> pd.DataFrame:
print("\n" + "=" * 90)
print("E1 — Biased universe vs Point-in-time universe (recovery_mom_top10)")
print("=" * 90)
rows = []
# Biased: current 503 tickers extrapolated backward
biased = pd.read_csv(BIASED_CSV, index_col=0, parse_dates=True)
# Use same date range as PIT for a fair comparison
common_start = max(biased.index[0], pit_prices_masked.index[0])
common_end = min(biased.index[-1], pit_prices_masked.index[-1])
biased_window = slice_period(biased, str(common_start.date()), str(common_end.date()))
pit_window = slice_period(pit_prices_masked, str(common_start.date()), str(common_end.date()))
# Drop non-ticker columns (SPY is in PIT but not in the masked tickers)
biased_tickers = [c for c in biased_window.columns if c != "SPY"]
pit_tickers = [c for c in pit_window.columns if c != "SPY"]
# Use RecoveryMomentumPlus with identical defaults to recovery_mom_top10.
# The original strategy uses na_option="bottom" which misranks NaN-masked
# data (non-members appear "top"); the Plus variant uses na_option="keep".
strat = RecoveryMomentumPlus(top_n=10) # defaults match RecoveryMomentumStrategy
eq_biased = run_strategy(strat, biased_window[biased_tickers])
eq_pit = run_strategy(RecoveryMomentumPlus(top_n=10), pit_window[pit_tickers])
rows.append(pit.summarize(eq_biased, name="recovery_mom_top10 (BIASED)"))
rows.append(pit.summarize(eq_pit, name="recovery_mom_top10 (POINT-IN-TIME)"))
# Benchmark: SPY buy-and-hold in same window
if "SPY" in biased_window.columns:
spy_bh = (biased_window["SPY"] / biased_window["SPY"].iloc[0]) * 10_000
rows.append(pit.summarize(spy_bh, name="SPY buy-and-hold"))
for r in rows:
print(pit.fmt_row(r))
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Experiment 2: hyperparameter sweep with train/test split
# ---------------------------------------------------------------------------
def exp2_sweep(pit_masked: pd.DataFrame) -> pd.DataFrame:
print("\n" + "=" * 90)
print("E2 — Hyperparameter sweep (train: 2016-2022, test: 2023-2026)")
print("=" * 90)
tickers = [c for c in pit_masked.columns if c != "SPY"]
prices = pit_masked[tickers]
train = slice_period(prices, "2016-04-01", "2022-12-31")
test = slice_period(prices, "2023-01-01", None)
grid = []
for top_n in [5, 8, 10, 15]:
for rec_win in [42, 63, 126]:
for rec_w in [0.3, 0.5, 0.7]:
for rebal in [10, 21]:
grid.append(dict(top_n=top_n, recovery_window=rec_win,
rec_weight=rec_w, rebal_freq=rebal))
results = []
for i, cfg in enumerate(grid):
strat_train = RecoveryMomentumPlus(**cfg)
eq_tr = run_strategy(strat_train, train)
sum_tr = pit.summarize(eq_tr, name="train")
strat_test = RecoveryMomentumPlus(**cfg)
eq_te = run_strategy(strat_test, test)
sum_te = pit.summarize(eq_te, name="test")
results.append({
**cfg,
"train_CAGR": sum_tr["CAGR"],
"train_Sharpe": sum_tr["Sharpe"],
"train_MaxDD": sum_tr["MaxDD"],
"test_CAGR": sum_te["CAGR"],
"test_Sharpe": sum_te["Sharpe"],
"test_MaxDD": sum_te["MaxDD"],
"test_Calmar": sum_te["Calmar"],
})
if (i + 1) % 10 == 0 or i == len(grid) - 1:
print(f" ... {i+1}/{len(grid)} configs evaluated")
df = pd.DataFrame(results)
df = df.sort_values("test_Sharpe", ascending=False)
# Print top 10 by TEST Sharpe, then top 10 by TRAIN Sharpe to see overfit gap
print("\n --- Top 10 by TEST Sharpe (out-of-sample, 2023-2026) ---")
disp_cols = ["top_n", "recovery_window", "rec_weight", "rebal_freq",
"train_Sharpe", "test_Sharpe", "train_CAGR", "test_CAGR",
"test_MaxDD", "test_Calmar"]
print(df.head(10)[disp_cols].to_string(index=False,
formatters={"train_Sharpe": "{:.2f}".format, "test_Sharpe": "{:.2f}".format,
"train_CAGR": "{:.1%}".format, "test_CAGR": "{:.1%}".format,
"test_MaxDD": "{:.1%}".format, "test_Calmar": "{:.2f}".format}))
print("\n --- Top 10 by TRAIN Sharpe (for comparison / overfit check) ---")
df_tr = df.sort_values("train_Sharpe", ascending=False)
print(df_tr.head(10)[disp_cols].to_string(index=False,
formatters={"train_Sharpe": "{:.2f}".format, "test_Sharpe": "{:.2f}".format,
"train_CAGR": "{:.1%}".format, "test_CAGR": "{:.1%}".format,
"test_MaxDD": "{:.1%}".format, "test_Calmar": "{:.2f}".format}))
return df
# ---------------------------------------------------------------------------
# Experiment 3: regime filter
# ---------------------------------------------------------------------------
def exp3_regime(pit_masked: pd.DataFrame) -> pd.DataFrame:
print("\n" + "=" * 90)
print("E3 — SPY MA200 regime filter (out-of-sample 2023-2026)")
print("=" * 90)
tickers = [c for c in pit_masked.columns if c != "SPY"]
# Compute MA from FULL history so the filter is warmed up before 2023.
spy_full = pit_masked["SPY"].dropna() if "SPY" in pit_masked.columns else None
filt_full_200 = spy_ma200_filter(spy_full, ma_window=200) if spy_full is not None else None
filt_full_150 = spy_ma200_filter(spy_full, ma_window=150) if spy_full is not None else None
test = slice_period(pit_masked, "2023-01-01", None)
prices = test[tickers]
filt = filt_full_200.reindex(test.index).fillna(False).astype(bool) if filt_full_200 is not None else None
filt_150 = filt_full_150.reindex(test.index).fillna(False).astype(bool) if filt_full_150 is not None else None
rows = []
base = RecoveryMomentumPlus(top_n=10)
rows.append(pit.summarize(run_strategy(base, prices), name="top10 (no filter)"))
rows.append(pit.summarize(run_strategy(RecoveryMomentumPlus(top_n=10), prices,
regime_filter=filt),
name="top10 + SPY>MA200 filter"))
rows.append(pit.summarize(run_strategy(RecoveryMomentumPlus(top_n=10), prices,
regime_filter=filt_150),
name="top10 + SPY>MA150 filter"))
for r in rows:
print(pit.fmt_row(r))
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Experiment 4: weighting schemes
# ---------------------------------------------------------------------------
def exp4_weighting(pit_masked: pd.DataFrame) -> pd.DataFrame:
print("\n" + "=" * 90)
print("E4 — Weighting schemes (out-of-sample 2023-2026, top_n=10)")
print("=" * 90)
tickers = [c for c in pit_masked.columns if c != "SPY"]
test = slice_period(pit_masked[tickers], "2023-01-01", None)
rows = []
for w in ["equal", "inv_vol", "rank"]:
strat = RecoveryMomentumPlus(top_n=10, weighting=w)
eq = run_strategy(strat, test)
rows.append(pit.summarize(eq, name=f"top10 weighting={w}"))
for r in rows:
print(pit.fmt_row(r))
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Experiment 5: ensemble
# ---------------------------------------------------------------------------
def exp5_ensemble(pit_masked: pd.DataFrame, sweep_df: pd.DataFrame) -> pd.DataFrame:
print("\n" + "=" * 90)
print("E5 — Ensemble of 3 uncorrelated top configs (out-of-sample 2023-2026)")
print("=" * 90)
tickers = [c for c in pit_masked.columns if c != "SPY"]
test = slice_period(pit_masked[tickers], "2023-01-01", None)
# Pick top-20 by test_Sharpe, then greedily keep picks whose equity curves
# correlate < 0.9 with already-kept picks.
top20 = sweep_df.sort_values("test_Sharpe", ascending=False).head(20)
curves = []
components = []
for _, row in top20.iterrows():
cfg = dict(top_n=int(row["top_n"]),
recovery_window=int(row["recovery_window"]),
rec_weight=float(row["rec_weight"]),
rebal_freq=int(row["rebal_freq"]))
strat = RecoveryMomentumPlus(**cfg)
eq = run_strategy(strat, test)
if any(eq.pct_change().corr(c.pct_change()) > 0.9 for c in curves):
continue
curves.append(eq)
components.append((RecoveryMomentumPlus(**cfg), 1.0))
if len(components) >= 3:
break
print(f" Selected {len(components)} uncorrelated configs for ensemble:")
for strat, _ in components:
print(f" top_n={strat.top_n}, rec_win={strat.recovery_window}, "
f"rec_w={strat.rec_weight}, rebal={strat.rebal_freq}")
ens = EnsembleStrategy(components)
eq_ens = run_strategy(ens, test)
rows = [
pit.summarize(curves[i], name=f" component {i+1}") for i in range(len(curves))
]
rows.append(pit.summarize(eq_ens, name="ENSEMBLE (equal-weight)"))
# Also ensemble + regime filter (compute MA from full history)
if "SPY" in pit_masked.columns:
spy_full = pit_masked["SPY"].dropna()
filt = spy_ma200_filter(spy_full).reindex(test.index).fillna(False).astype(bool)
eq_ens_reg = run_strategy(EnsembleStrategy(components), test, regime_filter=filt)
rows.append(pit.summarize(eq_ens_reg, name="ENSEMBLE + SPY>MA200 filter"))
for r in rows:
print(pit.fmt_row(r))
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print("Loading point-in-time price data...")
raw = pit.load_pit_prices()
print(f" Raw (union) shape: {raw.shape}, {raw.index[0].date()}{raw.index[-1].date()}")
masked = pit.pit_universe(raw)
# Sanity: how many ticker-days are masked out?
total = masked.size
valid = masked.notna().sum().sum()
print(f" Point-in-time valid ticker-days: {valid:,} / {total:,} ({valid/total*100:.1f}%)")
daily_universe = masked.notna().sum(axis=1)
print(f" Universe size per day: min={daily_universe.min()}, median={int(daily_universe.median())}, max={daily_universe.max()}")
e1 = exp1_bias_drift(masked)
sweep = exp2_sweep(masked)
e3 = exp3_regime(masked)
e4 = exp4_weighting(masked)
e5 = exp5_ensemble(masked, sweep)
# Save sweep for inspection
out = os.path.join(DATA_DIR, "research_sweep.csv")
sweep.to_csv(out, index=False)
print(f"\n Full sweep saved to {out}")
if __name__ == "__main__":
main()

125
research/pit_backtest.py Normal file
View File

@@ -0,0 +1,125 @@
"""
Point-in-time backtest runner.
Key idea: mask price data to NaN outside S&P 500 membership windows before
passing to the strategy. The strategy's signal computations then naturally
exclude non-members — no refactoring of strategies required.
Caveat: a stock joining the index has no signal for ~252 days after joining
(rolling windows need non-NaN warm-up). This is conservative but unbiased.
"""
import os
import numpy as np
import pandas as pd
import metrics
import universe_history as uh
DATA_DIR = "data"
PIT_CSV = os.path.join(DATA_DIR, "us_pit.csv")
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
def load_pit_prices() -> pd.DataFrame:
"""Load the full historical S&P 500 price matrix (delisted included)."""
if not os.path.exists(PIT_CSV):
raise FileNotFoundError(
f"{PIT_CSV} not found. Run `uv run python -m research.fetch_historical` first."
)
df = pd.read_csv(PIT_CSV, index_col=0, parse_dates=True)
return df.sort_index()
def pit_universe(prices: pd.DataFrame) -> pd.DataFrame:
"""Return prices masked to S&P 500 membership at each date (NaN outside)."""
intervals = uh.load_sp500_history()
return uh.mask_prices(prices, intervals)
# ---------------------------------------------------------------------------
# Backtest engine (mirrors main.backtest but accepts masked prices)
# ---------------------------------------------------------------------------
def backtest(
strategy,
prices: pd.DataFrame,
initial_capital: float = 10_000,
transaction_cost: float = 0.001,
fixed_fee: float = 0.0,
benchmark: pd.Series | None = None,
regime_filter: pd.Series | None = None,
) -> pd.Series:
"""
Vectorized backtest with optional regime filter.
`regime_filter`: boolean series aligned to prices.index. True → be in the
market (use strategy weights). False → go to cash. When None, always invested.
"""
weights = strategy.generate_signals(prices)
weights = weights.reindex(prices.index).fillna(0.0)
if regime_filter is not None:
rf = regime_filter.reindex(prices.index).fillna(False).astype(float)
weights = weights.mul(rf, axis=0)
daily_returns = prices.pct_change().fillna(0.0)
portfolio_returns = (daily_returns * weights).sum(axis=1)
turnover = weights.diff().abs().sum(axis=1).fillna(0.0)
portfolio_returns -= turnover * transaction_cost
if fixed_fee > 0:
weight_changes = weights.diff().fillna(0.0)
n_trades = (weight_changes.abs() > 1e-8).sum(axis=1)
equity_running = (1 + portfolio_returns).cumprod() * initial_capital
fee_impact = (n_trades * fixed_fee) / equity_running.shift(1).fillna(initial_capital)
portfolio_returns -= fee_impact
equity = (1 + portfolio_returns).cumprod() * initial_capital
return equity
# ---------------------------------------------------------------------------
# Metrics helper
# ---------------------------------------------------------------------------
def summarize(equity: pd.Series, name: str = "") -> dict:
"""Return a dict of key performance metrics (no printing)."""
eq = equity.dropna()
if len(eq) < 2:
return {"name": name, "error": "insufficient data"}
daily = eq.pct_change().dropna()
total_return = eq.iloc[-1] / eq.iloc[0] - 1
years = (eq.index[-1] - eq.index[0]).days / 365.25
cagr = (eq.iloc[-1] / eq.iloc[0]) ** (1 / years) - 1 if years > 0 else 0.0
vol = daily.std() * np.sqrt(252)
sharpe = (daily.mean() * 252) / vol if vol > 0 else 0.0
downside = daily[daily < 0].std() * np.sqrt(252)
sortino = (daily.mean() * 252) / downside if downside > 0 else 0.0
dd = (eq / eq.cummax() - 1).min()
calmar = cagr / abs(dd) if dd < 0 else 0.0
return {
"name": name,
"CAGR": cagr,
"Sharpe": sharpe,
"Sortino": sortino,
"MaxDD": dd,
"Calmar": calmar,
"TotalRet": total_return,
"Vol": vol,
}
def fmt_row(r: dict) -> str:
return (f" {r['name']:<38s} "
f"CAGR={r['CAGR']*100:>6.1f}% "
f"Sharpe={r['Sharpe']:>5.2f} "
f"Sortino={r['Sortino']:>5.2f} "
f"MaxDD={r['MaxDD']*100:>6.1f}% "
f"Calmar={r['Calmar']:>5.2f} "
f"Total={r['TotalRet']*100:>7.1f}%")

View File

@@ -0,0 +1,23 @@
import pandas as pd
LONG_MA_WINDOW = 200
RS_WINDOW = 63
def build_regime_filter(etf_close: pd.DataFrame, market_col: str = "SPY") -> pd.Series:
"""Return a next-day tradable regime flag based on market trend and ETF leadership."""
prices = etf_close.sort_index()
if market_col not in prices.columns:
raise KeyError(f"{market_col} not found in etf_close")
market = prices[market_col]
market_ma = market.rolling(LONG_MA_WINDOW, min_periods=LONG_MA_WINDOW).mean()
market_ok = market.gt(market_ma)
rs = prices.pct_change(RS_WINDOW, fill_method=None)
non_market_rs = rs.drop(columns=[market_col], errors="ignore")
leader_ok = non_market_rs.gt(rs[market_col], axis=0).any(axis=1)
regime = (market_ok & leader_ok).astype(bool)
return regime.shift(1, fill_value=False)

150
research/strategies_plus.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Optimization variants of RecoveryMomentumStrategy.
Four dimensions explored:
1. Hyperparameters (top_n, recovery_window, mom_lookback, rebal_freq, weights)
2. Regime filter: zero-out weights when SPY < MA200
3. Weighting scheme: equal / inverse-vol / rank-weighted
4. Ensemble: weighted blend of multiple strategies
All strategies follow the same Strategy protocol (generate_signals → weights DF).
"""
import numpy as np
import pandas as pd
from strategies.base import Strategy
# ---------------------------------------------------------------------------
# Generalized Recovery+Momentum strategy
# ---------------------------------------------------------------------------
class RecoveryMomentumPlus(Strategy):
"""
Recovery + momentum composite with configurable blend, weighting, and
regime filter hooks.
Parameters
----------
recovery_window : int
Lookback for the recovery factor (price / rolling min - 1).
mom_lookback : int
Long-horizon momentum window total length.
mom_skip : int
Short-term reversal skip for momentum.
rebal_freq : int
Trading-day rebalance interval.
top_n : int
Number of stocks selected each rebalance.
rec_weight : float in [0, 1]
Weight of recovery factor in composite rank blend (mom_weight = 1 - rec_weight).
weighting : {"equal", "inv_vol", "rank"}
Portfolio weighting scheme for the selected top_n.
vol_window : int
Volatility lookback when weighting="inv_vol".
"""
def __init__(self,
recovery_window: int = 63,
mom_lookback: int = 252,
mom_skip: int = 21,
rebal_freq: int = 21,
top_n: int = 10,
rec_weight: float = 0.5,
weighting: str = "equal",
vol_window: int = 60):
if weighting not in ("equal", "inv_vol", "rank"):
raise ValueError(f"weighting must be equal|inv_vol|rank, got {weighting!r}")
self.recovery_window = recovery_window
self.mom_lookback = mom_lookback
self.mom_skip = mom_skip
self.rebal_freq = rebal_freq
self.top_n = top_n
self.rec_weight = rec_weight
self.weighting = weighting
self.vol_window = vol_window
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
# Factors
recovery = data / data.rolling(self.recovery_window).min() - 1
momentum = data.shift(self.mom_skip).pct_change(self.mom_lookback - self.mom_skip)
rec_rank = recovery.rank(axis=1, pct=True, na_option="keep")
mom_rank = momentum.rank(axis=1, pct=True, na_option="keep")
composite = self.rec_weight * rec_rank + (1 - self.rec_weight) * mom_rank
# Top-N selection
rank = composite.rank(axis=1, ascending=False, na_option="bottom")
n_valid = composite.notna().sum(axis=1)
enough = n_valid >= self.top_n
top_mask = (rank <= self.top_n) & enough.values.reshape(-1, 1)
# Weighting within top-N
if self.weighting == "equal":
raw = top_mask.astype(float)
elif self.weighting == "rank":
# Higher composite → higher weight within top-N
ranked_score = composite.where(top_mask, 0.0)
raw = ranked_score
elif self.weighting == "inv_vol":
# Use inverse realized-volatility as weights within top-N
rets = data.pct_change()
vol = rets.rolling(self.vol_window).std()
inv_vol = 1.0 / vol.replace(0, np.nan)
raw = inv_vol.where(top_mask, 0.0).fillna(0.0)
row_sums = raw.sum(axis=1).replace(0, np.nan)
signals = raw.div(row_sums, axis=0).fillna(0.0)
# Rebalance
warmup = max(self.mom_lookback, self.recovery_window, self.vol_window)
rebal_mask = pd.Series(False, index=data.index)
rebal_indices = list(range(warmup, len(data), self.rebal_freq))
rebal_mask.iloc[rebal_indices] = True
signals[~rebal_mask] = np.nan
signals = signals.ffill().fillna(0.0)
signals.iloc[:warmup] = 0.0
return signals.shift(1).fillna(0.0)
# ---------------------------------------------------------------------------
# Ensemble
# ---------------------------------------------------------------------------
class EnsembleStrategy(Strategy):
"""
Weighted blend of several sub-strategies. Each sub-strategy produces a
weight matrix; we linearly combine them. The result still sums to (at
most) 1 per row since each sub-strategy does.
"""
def __init__(self, components: list[tuple[Strategy, float]]):
total = sum(w for _, w in components)
self.components = [(s, w / total) for s, w in components]
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
out = None
for strat, w in self.components:
sig = strat.generate_signals(data).mul(w)
if out is None:
out = sig
else:
# Align columns (should be identical since same data passed)
out = out.add(sig, fill_value=0.0)
return out
# ---------------------------------------------------------------------------
# Regime filter helper
# ---------------------------------------------------------------------------
def spy_ma200_filter(spy: pd.Series, ma_window: int = 200) -> pd.Series:
"""
Boolean Series: True when SPY close > SPY MA(ma_window), shifted by 1 to
avoid lookahead. Use as `regime_filter=...` in pit_backtest.backtest().
"""
ma = spy.rolling(ma_window, min_periods=ma_window).mean()
signal = (spy > ma).fillna(False)
return signal.shift(1).fillna(False)

View File

@@ -0,0 +1,156 @@
import numpy as np
import pandas as pd
import data_manager
import universe_history as uh
from research.event_factors import breakout_after_compression_score
from research.regime_filters import build_regime_filter
from research.us_alpha_report import summarize_equity_window
from research.us_universe import build_tradable_mask
MIN_PRICE = 5.0
MIN_DOLLAR_VOLUME = 20_000_000.0
MIN_HISTORY_DAYS = 252
MIN_VALID_VOLUME_DAYS = 40
LIQUIDITY_WINDOW = 60
TREND_WINDOW = 126
RECOVERY_WINDOW = 63
HIGH_PROX_WINDOW = 126
ETF_TICKERS = ["SPY", "QQQ", "IWM", "MDY", "XLK", "XLF", "XLI", "XLV"]
def _price_rank_blend_score(close: pd.DataFrame) -> pd.DataFrame:
"""Simple price-only cross-sectional blend, shifted for next-day trading."""
trend = close.pct_change(TREND_WINDOW, fill_method=None)
recovery = close / close.rolling(RECOVERY_WINDOW, min_periods=RECOVERY_WINDOW).min() - 1
high_proximity = close / close.rolling(HIGH_PROX_WINDOW, min_periods=HIGH_PROX_WINDOW).max().replace(0, np.nan)
trend_rank = trend.rank(axis=1, pct=True, na_option="keep")
recovery_rank = recovery.rank(axis=1, pct=True, na_option="keep")
high_rank = high_proximity.rank(axis=1, pct=True, na_option="keep")
return ((trend_rank + recovery_rank + high_rank) / 3.0).shift(1)
def _build_equal_weight_portfolio(
score: pd.DataFrame,
tradable_mask: pd.DataFrame,
regime_filter: pd.Series,
top_n: int,
) -> pd.DataFrame:
"""Build equal-weight top-n long-only weights from aligned scores."""
aligned_score = score.reindex(index=tradable_mask.index, columns=tradable_mask.columns)
eligible_score = aligned_score.where(tradable_mask)
rank = eligible_score.rank(axis=1, ascending=False, na_option="bottom", method="first")
selected = (rank <= top_n) & eligible_score.notna()
selected = selected & regime_filter.reindex(tradable_mask.index, fill_value=False).to_numpy().reshape(-1, 1)
raw = selected.astype(float)
row_sums = raw.sum(axis=1).replace(0.0, np.nan)
return raw.div(row_sums, axis=0).fillna(0.0)
def _equity_curve(close: pd.DataFrame, weights: pd.DataFrame) -> pd.Series:
"""Convert daily weights into a simple close-to-close equity curve."""
returns = close.pct_change(fill_method=None).fillna(0.0)
portfolio_returns = (returns * weights).sum(axis=1)
return (1.0 + portfolio_returns).cumprod()
def _read_panel_csv(path: str) -> pd.DataFrame:
return pd.read_csv(path, index_col=0, parse_dates=True).sort_index()
def load_saved_pit_market_data(data_dir: str = "data", prefix: str = "us_pit") -> dict[str, pd.DataFrame]:
"""Load saved PIT OHLCV panels from disk."""
panels = {}
for field in ("close", "high", "low", "volume"):
panels[field] = _read_panel_csv(f"{data_dir}/{prefix}_{field}.csv")
return panels
def load_saved_etf_close(data_dir: str = "data", market: str = "us_etf") -> pd.DataFrame:
"""Load saved ETF closes or populate them on demand."""
path = f"{data_dir}/{market}.csv"
try:
return _read_panel_csv(path)
except FileNotFoundError:
original_data_dir = data_manager.DATA_DIR
try:
data_manager.DATA_DIR = data_dir
return data_manager.update_market_data(market, ETF_TICKERS, ["close"])["close"]
finally:
data_manager.DATA_DIR = original_data_dir
def run_alpha_pipeline(
market_data,
etf_close,
pit_membership=None,
windows=(1, 2, 3, 5, 10),
top_n=10,
) -> pd.DataFrame:
"""Run a lightweight strict US alpha pipeline and summarize trailing windows."""
close = market_data["close"].sort_index()
high = market_data["high"].reindex(index=close.index, columns=close.columns).sort_index()
low = market_data["low"].reindex(index=close.index, columns=close.columns).sort_index()
volume = market_data["volume"].reindex(index=close.index, columns=close.columns).sort_index()
tradable_mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=pit_membership,
min_price=MIN_PRICE,
min_dollar_volume=MIN_DOLLAR_VOLUME,
min_history_days=MIN_HISTORY_DAYS,
min_valid_volume_days=MIN_VALID_VOLUME_DAYS,
liquidity_window=LIQUIDITY_WINDOW,
)
regime_filter = build_regime_filter(etf_close).reindex(close.index, fill_value=False)
strategy_scores = {
"breakout_regime": breakout_after_compression_score(close, high, low, volume),
"rank_blend_regime": _price_rank_blend_score(close),
}
summary_rows = []
for strategy_name, score in strategy_scores.items():
weights = _build_equal_weight_portfolio(score, tradable_mask, regime_filter, top_n)
equity = _equity_curve(close, weights)
for window_years in windows:
summary_rows.append(summarize_equity_window(equity, strategy_name, window_years))
return pd.DataFrame(summary_rows)
def run_saved_pit_alpha_pipeline(
data_dir: str = "data",
windows=(1, 2, 3, 5, 10),
top_n: int = 10,
) -> pd.DataFrame:
"""Load saved PIT OHLCV inputs and run the strict alpha pipeline."""
market_data = load_saved_pit_market_data(data_dir=data_dir)
etf_close = load_saved_etf_close(data_dir=data_dir)
intervals = uh.load_sp500_history()
pit_membership = uh.membership_mask(
market_data["close"].index,
intervals=intervals,
tickers=list(market_data["close"].columns),
)
return run_alpha_pipeline(
market_data=market_data,
etf_close=etf_close,
pit_membership=pit_membership,
windows=windows,
top_n=top_n,
)
def main() -> None:
summary = run_saved_pit_alpha_pipeline()
print(summary.to_string(index=False))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,37 @@
import numpy as np
import pandas as pd
TRADING_DAYS_PER_YEAR = 252
def summarize_equity_window(equity: pd.Series, strategy: str, window_years: int | float) -> dict:
"""Summarize a strategy equity curve over a trailing trading-day window."""
window_days = max(int(window_years * TRADING_DAYS_PER_YEAR), 1)
clean_equity = equity.dropna()
if len(clean_equity) < window_days + 1:
return {
"strategy": strategy,
"window_years": window_years,
"CAGR": np.nan,
"Sharpe": np.nan,
"MaxDD": np.nan,
"TotalRet": np.nan,
}
window_equity = clean_equity.tail(window_days + 1)
daily = window_equity.pct_change(fill_method=None).dropna()
total_ret = window_equity.iloc[-1] / window_equity.iloc[0] - 1
years = len(daily) / TRADING_DAYS_PER_YEAR
cagr = (window_equity.iloc[-1] / window_equity.iloc[0]) ** (1 / years) - 1 if years > 0 else np.nan
vol = daily.std() * np.sqrt(TRADING_DAYS_PER_YEAR)
sharpe = (daily.mean() * TRADING_DAYS_PER_YEAR) / vol if vol > 0 else 0.0
max_dd = (window_equity / window_equity.cummax() - 1).min()
return {
"strategy": strategy,
"window_years": window_years,
"CAGR": cagr,
"Sharpe": sharpe,
"MaxDD": max_dd,
"TotalRet": total_ret,
}

53
research/us_universe.py Normal file
View File

@@ -0,0 +1,53 @@
import pandas as pd
def build_tradable_mask(
close: pd.DataFrame,
volume: pd.DataFrame,
pit_membership: pd.DataFrame | None,
min_price: float,
min_dollar_volume: float,
min_history_days: int,
min_valid_volume_days: int,
liquidity_window: int = 60,
) -> pd.DataFrame:
"""Build a point-in-time tradable universe mask using only lagged inputs."""
close = close.sort_index()
volume = volume.reindex(index=close.index, columns=close.columns).sort_index()
if pit_membership is None:
pit_mask = pd.DataFrame(True, index=close.index, columns=close.columns)
else:
pit_mask = pit_membership.reindex(
index=close.index,
columns=close.columns,
fill_value=False,
)
pit_mask = pit_mask.where(pit_mask.notna(), False).astype(bool)
eligible_close = close.where(pit_mask)
eligible_volume = volume.where(pit_mask)
lagged_close = eligible_close.shift(1)
lagged_volume = eligible_volume.shift(1)
lagged_dollar_volume = lagged_close * lagged_volume
price_ok = lagged_close.gt(min_price)
liquidity_ok = (
lagged_dollar_volume.rolling(window=liquidity_window, min_periods=1).median().gt(min_dollar_volume)
)
history_ok = (
lagged_close.notna()
.rolling(window=min_history_days, min_periods=min_history_days)
.sum()
.ge(min_history_days)
)
valid_volume_ok = (
lagged_dollar_volume.notna()
.rolling(window=liquidity_window, min_periods=1)
.sum()
.ge(min_valid_volume_days)
)
mask = price_ok & liquidity_ok & history_ok & valid_volume_ok
mask = mask & pit_mask
return mask.astype(bool)

118
tests/test_alpha_signals.py Normal file
View File

@@ -0,0 +1,118 @@
import unittest
import warnings
import numpy as np
import pandas as pd
class AlphaSignalTests(unittest.TestCase):
def test_build_regime_filter_requires_market_trend_and_non_market_leader(self):
from research.regime_filters import build_regime_filter
dates = pd.date_range("2023-01-01", periods=260, freq="D")
spy = pd.Series([100.0 + i for i in range(260)], index=dates)
qqq_leader = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates)
xlu = pd.Series([100.0 + 0.2 * i for i in range(260)], index=dates)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
bullish = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_leader, "XLU": xlu}))
qqq_laggard = pd.Series([100.0 + 0.5 * i for i in range(260)], index=dates)
no_leader = build_regime_filter(pd.DataFrame({"SPY": spy, "QQQ": qqq_laggard, "XLU": xlu}))
self.assertEqual(len(caught), 0)
self.assertFalse(bool(bullish.iloc[199]))
self.assertTrue(bool(bullish.iloc[-1]))
self.assertFalse(bool(no_leader.iloc[-1]))
def test_build_regime_filter_handles_internal_missing_prices_without_warnings(self):
from research.regime_filters import build_regime_filter
dates = pd.date_range("2023-01-01", periods=260, freq="D")
spy = pd.Series([100.0 + i for i in range(260)], index=dates)
qqq = pd.Series([100.0 + 1.4 * i for i in range(260)], index=dates)
qqq.iloc[120] = np.nan
etf_close = pd.DataFrame({"SPY": spy, "QQQ": qqq, "XLU": 100.0}, index=dates)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
regime = build_regime_filter(etf_close)
self.assertEqual(len(caught), 0)
self.assertEqual(str(regime.dtype), "bool")
def test_breakout_after_compression_score_is_shifted_and_rewards_breakout_profile(self):
from research.event_factors import breakout_after_compression_score
dates = pd.date_range("2024-01-01", periods=80, freq="D")
aaa_close = [100.0 + i for i in range(60)] + [159.0 + 0.05 * i for i in range(20)]
bbb_close = [100.0 + i for i in range(60)] + [150.0 - i for i in range(20)]
close = pd.DataFrame({"AAA": aaa_close, "BBB": bbb_close}, index=dates)
high = pd.DataFrame(
{
"AAA": [value + 0.4 for value in aaa_close],
"BBB": [value + 4.0 for value in bbb_close],
},
index=dates,
)
low = pd.DataFrame(
{
"AAA": [value - 0.4 for value in aaa_close],
"BBB": [value - 4.0 for value in bbb_close],
},
index=dates,
)
volume = pd.DataFrame(
{
"AAA": [1_000.0] * 79 + [1_000.0],
"BBB": [1_000.0] * 80,
},
index=dates,
)
volume.loc[dates[-2], "AAA"] = 6_000.0
shifted_result = breakout_after_compression_score(close, high, low, volume)
self.assertGreater(
shifted_result.loc[dates[-1], "AAA"],
shifted_result.loc[dates[-1], "BBB"],
)
changed_last_day = close.copy()
changed_last_day_high = high.copy()
changed_last_day_low = low.copy()
changed_last_day_volume = volume.copy()
changed_last_day.loc[dates[-1], "AAA"] = 120.0
changed_last_day_high.loc[dates[-1], "AAA"] = 130.0
changed_last_day_low.loc[dates[-1], "AAA"] = 110.0
changed_last_day_volume.loc[dates[-1], "AAA"] = 20_000.0
last_day_changed_result = breakout_after_compression_score(
changed_last_day,
changed_last_day_high,
changed_last_day_low,
changed_last_day_volume,
)
self.assertEqual(
shifted_result.loc[dates[-1], "AAA"],
last_day_changed_result.loc[dates[-1], "AAA"],
)
def test_breakout_after_compression_score_keeps_float_output_when_denominators_hit_zero(self):
from research.event_factors import breakout_after_compression_score
dates = pd.date_range("2024-01-01", periods=70, freq="D")
close = pd.DataFrame({"AAA": [10.0] * 70}, index=dates)
high = pd.DataFrame({"AAA": [10.0] * 70}, index=dates)
low = pd.DataFrame({"AAA": [10.0] * 70}, index=dates)
volume = pd.DataFrame({"AAA": [0.0] * 70}, index=dates)
score = breakout_after_compression_score(close, high, low, volume)
self.assertEqual(str(score.dtypes["AAA"]), "float64")
self.assertTrue(pd.isna(score.iloc[-1]["AAA"]))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,46 @@
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import pandas as pd
from research import fetch_historical
class FetchHistoricalTests(unittest.TestCase):
def test_fetch_all_historical_ohlcv_writes_field_specific_csvs(self):
dates = pd.to_datetime(["2024-01-02", "2024-01-03"])
raw = pd.DataFrame(
{
("Close", "AAA"): [10.0, 11.0],
("Close", "BBB"): [20.0, 21.0],
("High", "AAA"): [10.5, 11.5],
("High", "BBB"): [20.5, 21.5],
("Low", "AAA"): [9.5, 10.5],
("Low", "BBB"): [19.5, 20.5],
("Volume", "AAA"): [1000.0, 1100.0],
("Volume", "BBB"): [2000.0, 2100.0],
},
index=dates,
)
raw.columns = pd.MultiIndex.from_tuples(raw.columns)
with tempfile.TemporaryDirectory() as tmpdir:
with mock.patch.object(fetch_historical, "DATA_DIR", tmpdir):
with mock.patch.object(fetch_historical, "OUT_PATH", str(Path(tmpdir) / "us_pit.csv")):
with mock.patch("research.fetch_historical.uh.load_sp500_history", return_value={"AAA": [[None, None]], "BBB": [[None, None]]}):
with mock.patch("research.fetch_historical.uh.all_tickers_ever", return_value=["AAA", "BBB"]):
with mock.patch("research.fetch_historical.yf.download", return_value=raw):
panels = fetch_historical.fetch_all_historical_ohlcv(force=True)
self.assertEqual(set(panels.keys()), {"close", "high", "low", "volume"})
self.assertTrue((Path(tmpdir) / "us_pit.csv").exists())
self.assertTrue((Path(tmpdir) / "us_pit_close.csv").exists())
self.assertTrue((Path(tmpdir) / "us_pit_high.csv").exists())
self.assertTrue((Path(tmpdir) / "us_pit_low.csv").exists())
self.assertTrue((Path(tmpdir) / "us_pit_volume.csv").exists())
if __name__ == "__main__":
unittest.main()

144
tests/test_market_data.py Normal file
View File

@@ -0,0 +1,144 @@
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import pandas as pd
import data_manager
class UpdateMarketDataTests(unittest.TestCase):
def test_update_market_data_accepts_lowercase_fields_and_does_not_fill_volume(self):
dates = pd.to_datetime(["2024-01-02", "2024-01-03", "2024-01-04"])
raw = pd.DataFrame(
{
("Close", "AAA"): [10.0, 11.0, 12.0],
("Close", "BBB"): [20.0, float("nan"), 22.0],
("Open", "AAA"): [9.5, 10.5, 11.5],
("Open", "BBB"): [19.5, 20.5, 21.5],
("High", "AAA"): [10.5, 11.5, 12.5],
("High", "BBB"): [20.5, 21.5, 22.5],
("Low", "AAA"): [9.0, 10.0, 11.0],
("Low", "BBB"): [19.0, 20.0, 21.0],
("Volume", "AAA"): [1000, 1100, 1200],
("Volume", "BBB"): [2000, float("nan"), 2200],
},
index=dates,
)
raw.columns = pd.MultiIndex.from_tuples(raw.columns)
with tempfile.TemporaryDirectory() as tmpdir:
with mock.patch.object(data_manager, "DATA_DIR", tmpdir):
with mock.patch("data_manager.yf.download", return_value=raw) as mocked_download:
panels = data_manager.update_market_data(
"us",
["AAA", "BBB"],
["close", "open", "high", "low", "volume"],
)
self.assertEqual(set(panels), {"close", "open", "high", "low", "volume"})
self.assertEqual(panels["close"].loc[dates[1], "BBB"], 20.0)
self.assertTrue(pd.isna(panels["volume"].loc[dates[1], "BBB"]))
self.assertTrue((Path(tmpdir) / "us.csv").exists())
self.assertTrue((Path(tmpdir) / "us_open.csv").exists())
self.assertTrue((Path(tmpdir) / "us_high.csv").exists())
self.assertTrue((Path(tmpdir) / "us_low.csv").exists())
self.assertTrue((Path(tmpdir) / "us_volume.csv").exists())
saved_high = pd.read_csv(Path(tmpdir) / "us_high.csv", index_col=0, parse_dates=True)
pd.testing.assert_frame_equal(saved_high, panels["high"], check_freq=False)
self.assertEqual(mocked_download.call_args.args[0], ["AAA", "BBB"])
self.assertEqual(mocked_download.call_args.kwargs["auto_adjust"], True)
self.assertIn("start", mocked_download.call_args.kwargs)
def test_update_market_data_rejects_unsupported_fields(self):
with tempfile.TemporaryDirectory() as tmpdir:
with mock.patch.object(data_manager, "DATA_DIR", tmpdir):
with self.assertRaisesRegex(ValueError, "Unsupported market data field: adjusted_close"):
data_manager.update_market_data("us", ["AAA"], ["adjusted_close"])
def test_update_market_data_preserves_existing_cache_columns_and_dates(self):
existing_dates = pd.to_datetime(["2024-01-01", "2024-01-02"])
new_dates = pd.to_datetime(["2024-01-02", "2024-01-03"])
existing_close = pd.DataFrame(
{
"AAA": [9.0, 10.0],
"CCC": [30.0, 31.0],
},
index=existing_dates,
)
downloaded_close = pd.DataFrame({"Close": [10.5, 11.5]}, index=new_dates)
with tempfile.TemporaryDirectory() as tmpdir:
existing_close.to_csv(Path(tmpdir) / "us.csv")
with mock.patch.object(data_manager, "DATA_DIR", tmpdir):
with mock.patch("data_manager.yf.download", return_value=downloaded_close):
panels = data_manager.update_market_data("us", ["AAA"], ["close"])
expected = pd.DataFrame(
{
"AAA": [9.0, 10.5, 11.5],
"CCC": [30.0, 31.0, float("nan")],
},
index=pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]),
)
saved_close = pd.read_csv(Path(tmpdir) / "us.csv", index_col=0, parse_dates=True)
pd.testing.assert_frame_equal(panels["close"], expected, check_freq=False)
pd.testing.assert_frame_equal(saved_close, expected, check_freq=False)
def test_update_market_data_volume_merge_can_clear_stale_cached_values(self):
existing_dates = pd.to_datetime(["2024-01-01", "2024-01-02"])
new_dates = pd.to_datetime(["2024-01-02", "2024-01-03", "2024-01-04"])
existing_volume = pd.DataFrame(
{
"AAA": [1000.0, 9999.0],
"CCC": [3000.0, 3100.0],
},
index=existing_dates,
)
downloaded_volume = pd.DataFrame({"Volume": [float("nan"), 1200.0, 1300.0]}, index=new_dates)
with tempfile.TemporaryDirectory() as tmpdir:
existing_volume.to_csv(Path(tmpdir) / "us_volume.csv")
with mock.patch.object(data_manager, "DATA_DIR", tmpdir):
with mock.patch("data_manager.yf.download", return_value=downloaded_volume):
panels = data_manager.update_market_data("us", ["AAA"], ["volume"])
expected = pd.DataFrame(
{
"AAA": [1000.0, float("nan"), 1200.0, 1300.0],
"CCC": [3000.0, 3100.0, float("nan"), float("nan")],
},
index=pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"]),
)
saved_volume = pd.read_csv(Path(tmpdir) / "us_volume.csv", index_col=0, parse_dates=True)
pd.testing.assert_frame_equal(panels["volume"], expected, check_freq=False)
pd.testing.assert_frame_equal(saved_volume, expected, check_freq=False)
def test_update_market_data_handles_single_ticker_multiindex_download(self):
dates = pd.to_datetime(["2024-01-02", "2024-01-03"])
raw = pd.DataFrame(
{
("Close", "AAA"): [10.0, 11.0],
("Volume", "AAA"): [1000.0, 1100.0],
},
index=dates,
)
raw.columns = pd.MultiIndex.from_tuples(raw.columns)
with tempfile.TemporaryDirectory() as tmpdir:
with mock.patch.object(data_manager, "DATA_DIR", tmpdir):
with mock.patch("data_manager.yf.download", return_value=raw):
panels = data_manager.update_market_data("us", ["AAA"], ["close", "volume"])
expected_close = pd.DataFrame({"AAA": [10.0, 11.0]}, index=dates)
expected_volume = pd.DataFrame({"AAA": [1000.0, 1100.0]}, index=dates)
pd.testing.assert_frame_equal(panels["close"], expected_close, check_freq=False)
pd.testing.assert_frame_equal(panels["volume"], expected_volume, check_freq=False)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,164 @@
import unittest
from pathlib import Path
from unittest import mock
import pandas as pd
class USAlphaPipelineTests(unittest.TestCase):
def test_build_equal_weight_portfolio_caps_holdings_under_ties(self):
from research.us_alpha_pipeline import _build_equal_weight_portfolio
dates = pd.date_range("2024-01-01", periods=2, freq="D")
score = pd.DataFrame(
{
"AAA": [0.9, 0.9],
"BBB": [0.9, 0.9],
"CCC": [0.9, 0.9],
},
index=dates,
)
tradable_mask = pd.DataFrame(True, index=dates, columns=score.columns)
regime = pd.Series([True, True], index=dates)
weights = _build_equal_weight_portfolio(score, tradable_mask, regime, top_n=2)
self.assertEqual(int((weights.iloc[-1] > 0).sum()), 2)
self.assertAlmostEqual(float(weights.iloc[-1].sum()), 1.0)
def test_equity_curve_uses_prior_day_weights_for_returns(self):
from research.us_alpha_pipeline import _equity_curve
dates = pd.date_range("2024-01-01", periods=3, freq="D")
close = pd.DataFrame({"AAA": [1.0, 2.0, 4.0]}, index=dates)
weights = pd.DataFrame({"AAA": [0.0, 1.0, 0.0]}, index=dates)
equity = _equity_curve(close, weights)
self.assertEqual(float(equity.iloc[1]), 2.0)
self.assertEqual(float(equity.iloc[2]), 2.0)
def test_summarize_equity_window_returns_nans_when_history_is_too_short(self):
from research.us_alpha_report import summarize_equity_window
dates = pd.date_range("2024-01-01", periods=10, freq="D")
equity = pd.Series([1.0 + 0.01 * i for i in range(10)], index=dates)
summary = summarize_equity_window(equity, "demo", window_years=1)
self.assertTrue(pd.isna(summary["CAGR"]))
self.assertTrue(pd.isna(summary["Sharpe"]))
self.assertTrue(pd.isna(summary["MaxDD"]))
self.assertTrue(pd.isna(summary["TotalRet"]))
def test_run_alpha_pipeline_returns_expected_strategy_summary(self):
from research.us_alpha_pipeline import run_alpha_pipeline
dates = pd.date_range("2023-01-01", periods=400, freq="D")
aaa_close = [50.0 + 0.20 * i for i in range(400)]
bbb_close = [55.0 + 0.12 * i for i in range(400)]
ccc_close = [60.0 + 0.05 * i for i in range(400)]
close = pd.DataFrame(
{
"AAA": aaa_close,
"BBB": bbb_close,
"CCC": ccc_close,
},
index=dates,
)
high = pd.DataFrame(
{
"AAA": [value + 0.5 for value in aaa_close],
"BBB": [value + 1.0 for value in bbb_close],
"CCC": [value + 1.5 for value in ccc_close],
},
index=dates,
)
low = pd.DataFrame(
{
"AAA": [value - 0.5 for value in aaa_close],
"BBB": [value - 1.0 for value in bbb_close],
"CCC": [value - 1.5 for value in ccc_close],
},
index=dates,
)
volume = pd.DataFrame(
{
"AAA": [1_500_000.0] * 400,
"BBB": [1_400_000.0] * 400,
"CCC": [1_300_000.0] * 400,
},
index=dates,
)
volume.loc[dates[-2], "AAA"] = 4_000_000.0
etf_close = pd.DataFrame(
{
"SPY": [300.0 + 0.8 * i for i in range(400)],
"QQQ": [280.0 + 1.1 * i for i in range(400)],
"XLF": [200.0 + 0.4 * i for i in range(400)],
},
index=dates,
)
market_data = {
"close": close,
"high": high,
"low": low,
"volume": volume,
}
summary = run_alpha_pipeline(
market_data=market_data,
etf_close=etf_close,
pit_membership=None,
windows=(1,),
top_n=2,
)
required_columns = {"strategy", "window_years", "CAGR", "Sharpe", "MaxDD", "TotalRet"}
self.assertTrue(required_columns.issubset(summary.columns))
self.assertEqual(set(summary["strategy"]), {"breakout_regime", "rank_blend_regime"})
self.assertEqual(set(summary["window_years"]), {1})
self.assertEqual(len(summary), 2)
self.assertTrue(summary[["CAGR", "Sharpe", "MaxDD", "TotalRet"]].notna().all().all())
def test_run_saved_pit_alpha_pipeline_reads_saved_inputs(self):
from research.us_alpha_pipeline import run_saved_pit_alpha_pipeline
dates = pd.date_range("2024-01-01", periods=320, freq="D")
close = pd.DataFrame(
{
"AAA": [50.0 + 0.2 * i for i in range(320)],
"BBB": [40.0 + 0.1 * i for i in range(320)],
},
index=dates,
)
high = close + 1.0
low = close - 1.0
volume = pd.DataFrame({"AAA": [2_500_000.0] * 320, "BBB": [2_000_000.0] * 320}, index=dates)
etf_close = pd.DataFrame(
{"SPY": [300.0 + 0.8 * i for i in range(320)], "QQQ": [280.0 + 1.1 * i for i in range(320)]},
index=dates,
)
with self.subTest("saved_inputs"):
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
close.to_csv(Path(tmpdir) / "us_pit_close.csv")
high.to_csv(Path(tmpdir) / "us_pit_high.csv")
low.to_csv(Path(tmpdir) / "us_pit_low.csv")
volume.to_csv(Path(tmpdir) / "us_pit_volume.csv")
etf_close.to_csv(Path(tmpdir) / "us_etf.csv")
intervals = {"AAA": [[None, None]], "BBB": [[None, None]]}
with mock.patch("research.us_alpha_pipeline.uh.load_sp500_history", return_value=intervals):
summary = run_saved_pit_alpha_pipeline(data_dir=tmpdir, windows=(1,), top_n=1)
self.assertEqual(set(summary["strategy"]), {"breakout_regime", "rank_blend_regime"})
if __name__ == "__main__":
unittest.main()

213
tests/test_us_universe.py Normal file
View File

@@ -0,0 +1,213 @@
import unittest
import warnings
import pandas as pd
class BuildTradableMaskTests(unittest.TestCase):
def test_build_tradable_mask_uses_only_lagged_price_and_liquidity_inputs(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=4, freq="D")
close = pd.DataFrame({"AAA": [4.0, 10.0, 10.0, 10.0]}, index=dates)
volume = pd.DataFrame({"AAA": [float("nan"), 200.0, 200.0, 200.0]}, index=dates)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=None,
min_price=5.0,
min_dollar_volume=1000.0,
min_history_days=2,
min_valid_volume_days=2,
liquidity_window=2,
)
expected = pd.DataFrame({"AAA": [False, False, False, True]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_uses_only_lagged_history(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=4, freq="D")
close = pd.DataFrame({"AAA": [10.0, float("nan"), 10.0, 10.0]}, index=dates)
volume = pd.DataFrame({"AAA": [200.0, 200.0, 200.0, 200.0]}, index=dates)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=None,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=2,
min_valid_volume_days=1,
liquidity_window=1,
)
expected = pd.DataFrame({"AAA": [False, False, False, False]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_requires_membership_history_before_first_eligible_day(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=4, freq="D")
close = pd.DataFrame({"AAA": [10.0, 10.0, 10.0, 10.0]}, index=dates)
volume = pd.DataFrame({"AAA": [200.0, 200.0, 200.0, 200.0]}, index=dates)
pit_membership = pd.DataFrame({"AAA": [False, False, True, True]}, index=dates)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=pit_membership,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=1,
min_valid_volume_days=1,
liquidity_window=1,
)
expected = pd.DataFrame({"AAA": [False, False, False, True]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_aligns_pit_membership_without_truthy_carryover(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=3, freq="D")
close = pd.DataFrame(
{
"AAA": [10.0, 10.0, 10.0],
"BBB": [12.0, 12.0, 12.0],
},
index=dates,
)
volume = pd.DataFrame(
{
"AAA": [1_000_000.0, 1_000_000.0, 1_000_000.0],
"BBB": [1_000_000.0, 1_000_000.0, 1_000_000.0],
},
index=dates,
)
pit_membership = pd.DataFrame(
{
"BBB": [True, True, False],
"CCC": [True, True, True],
},
index=pd.date_range("2024-01-02", periods=3, freq="D"),
)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=pit_membership,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=1,
min_valid_volume_days=1,
liquidity_window=1,
)
self.assertEqual(len(caught), 0)
expected = pd.DataFrame(
{
"AAA": [False, False, False],
"BBB": [False, False, True],
},
index=dates,
dtype=bool,
)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_treats_missing_membership_cells_as_false(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=3, freq="D")
close = pd.DataFrame({"AAA": [10.0, 10.0, 10.0]}, index=dates)
volume = pd.DataFrame({"AAA": [1_000_000.0, 1_000_000.0, 1_000_000.0]}, index=dates)
pit_membership = pd.DataFrame(
{"AAA": [True, pd.NA, True]},
index=dates,
dtype="boolean",
)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=pit_membership,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=1,
min_valid_volume_days=1,
liquidity_window=1,
)
expected = pd.DataFrame({"AAA": [False, False, False]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_uses_strict_thresholds(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=3, freq="D")
close = pd.DataFrame({"AAA": [5.0, 5.0, 5.0]}, index=dates)
volume = pd.DataFrame({"AAA": [300.0, 300.0, 300.0]}, index=dates)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=None,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=1,
min_valid_volume_days=1,
liquidity_window=1,
)
expected = pd.DataFrame({"AAA": [False, False, False]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_uses_strict_dollar_volume_threshold(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=3, freq="D")
close = pd.DataFrame({"AAA": [8.0, 8.0, 8.0]}, index=dates)
volume = pd.DataFrame({"AAA": [125.0, 125.0, 125.0]}, index=dates)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=None,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=1,
min_valid_volume_days=1,
liquidity_window=1,
)
expected = pd.DataFrame({"AAA": [False, False, False]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
def test_build_tradable_mask_requires_valid_dollar_volume_history(self):
from research.us_universe import build_tradable_mask
dates = pd.date_range("2024-01-01", periods=4, freq="D")
close = pd.DataFrame({"AAA": [10.0, float("nan"), 10.0, 10.0]}, index=dates)
volume = pd.DataFrame({"AAA": [200.0, 200.0, 200.0, 200.0]}, index=dates)
mask = build_tradable_mask(
close=close,
volume=volume,
pit_membership=None,
min_price=5.0,
min_dollar_volume=1_000.0,
min_history_days=1,
min_valid_volume_days=2,
liquidity_window=2,
)
expected = pd.DataFrame({"AAA": [False, False, False, False]}, index=dates, dtype=bool)
pd.testing.assert_frame_equal(mask, expected)
if __name__ == "__main__":
unittest.main()

View File

@@ -48,6 +48,16 @@ from strategies.recovery_momentum import RecoveryMomentumStrategy
from strategies.trend_following import TrendFollowingStrategy from strategies.trend_following import TrendFollowingStrategy
from universe import UNIVERSES from universe import UNIVERSES
# ---------------------------------------------------------------------------
# Per-market fixed trading fees (per trade, in the market's local currency)
# ---------------------------------------------------------------------------
# These are applied automatically by cmd_monitor and cmd_auto; they can still
# be overridden by explicitly passing --fixed-fee on the CLI.
MARKET_FEES = {
"us": 2.0, # USD per trade
"cn": 5.0, # CNY per trade (A-share minimum commission)
}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Strategy registry # Strategy registry
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -520,6 +530,12 @@ def cmd_evening(args):
post_value = portfolio_value(state["holdings"], close_prices, state["cash"]) post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][trade_date] = round(post_value, 2) state["daily_equity"][trade_date] = round(post_value, 2)
# Record daily snapshot so daily_log stays complete even on no-trade days
eq_vals = list(state["daily_equity"].values())
prev_eq = eq_vals[-2] if len(eq_vals) >= 2 else state["initial_capital"]
record_daily_snapshot(state, trade_date, close_prices, exec_trades, prev_eq)
state["pending_trades"] = None state["pending_trades"] = None
state["last_evening"] = trade_date state["last_evening"] = trade_date
save_state(state, market, strategy_name) save_state(state, market, strategy_name)
@@ -1082,12 +1098,13 @@ def cmd_monitor(args):
print(f" MONITOR MODE — {len(markets)} market(s), " print(f" MONITOR MODE — {len(markets)} market(s), "
f"{len(strategies)} strategies each") f"{len(strategies)} strategies each")
print(f" Capital: ${args.capital:,.0f} | " print(f" Capital: ${args.capital:,.0f} | "
f"Fee: ${args.fixed_fee:.2f}/trade | "
f"Integer shares: {args.integer_shares}") f"Integer shares: {args.integer_shares}")
for mkt, sched in market_schedules.items(): for mkt, sched in market_schedules.items():
fee = MARKET_FEES.get(mkt, args.fixed_fee)
print(f" {sched['label']}:") print(f" {sched['label']}:")
print(f" Morning: {sched['morn_h']:02d}:{sched['morn_m']:02d} {sched['tz']}") print(f" Morning: {sched['morn_h']:02d}:{sched['morn_m']:02d} {sched['tz']}")
print(f" Evening: {sched['eve_h']:02d}:{sched['eve_m']:02d} {sched['tz']}") print(f" Evening: {sched['eve_h']:02d}:{sched['eve_m']:02d} {sched['tz']}")
print(f" Fixed fee: {fee:.2f}/trade")
print(f" Strategies: {', '.join(strategies)}") print(f" Strategies: {', '.join(strategies)}")
print(f"{'='*60}") print(f"{'='*60}")
@@ -1132,10 +1149,12 @@ def cmd_monitor(args):
f"{now_local.strftime('%Y-%m-%d %H:%M:%S %Z')}") f"{now_local.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] {'='*55}") print(f"[monitor] {'='*55}")
market_fee = MARKET_FEES.get(market, args.fixed_fee)
for strat_name in strategies: for strat_name in strategies:
sub_args = copy.copy(args) sub_args = copy.copy(args)
sub_args.strategy = strat_name sub_args.strategy = strat_name
sub_args.market = market sub_args.market = market
sub_args.fixed_fee = market_fee
print(f"\n[monitor] --- {market.upper()}:{strat_name} ---") print(f"\n[monitor] --- {market.upper()}:{strat_name} ---")
try: try:
@@ -1289,8 +1308,10 @@ def cmd_auto(args):
integer_shares=args.integer_shares integer_shares=args.integer_shares
) )
# Fall back to per-market fee when the user didn't explicitly override
fixed_fee = args.fixed_fee if args.fixed_fee > 0 else MARKET_FEES.get(market, 0.0)
execute_trades(state, trades, close_prices, execute_trades(state, trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee, tx_cost=args.tx_cost, fixed_fee=fixed_fee,
trade_date=today_str, integer_shares=args.integer_shares) trade_date=today_str, integer_shares=args.integer_shares)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"]) post_value = portfolio_value(state["holdings"], close_prices, state["cash"])

230
universe_history.py Normal file
View File

@@ -0,0 +1,230 @@
"""
Point-in-time index membership reconstruction — fixes survivorship bias.
Approach: Wikipedia's "Selected changes to the list of S&P 500 components"
table lists every add/remove event (394 rows back to 1976, as of 2026). We
start from today's membership and walk the change log *backward*:
- An 'Added' ticker on date D was NOT a member before D.
- A 'Removed' ticker on date D WAS a member before D.
Applied iteratively, this yields the set of members on any historical date.
The membership info is cached in data/sp500_history.json so Wikipedia is hit
at most once per day. The cache stores per-ticker membership intervals:
{ "ticker": [[start, end_or_null], ...] }
where dates are YYYY-MM-DD strings.
"""
import io
import json
import os
import urllib.request
from datetime import date, datetime
import pandas as pd
CACHE_DIR = "data"
_HEADERS = {"User-Agent": "Mozilla/5.0 (quant-backtest)"}
# ---------------------------------------------------------------------------
# Fetch + parse Wikipedia
# ---------------------------------------------------------------------------
def _fetch_sp500_tables() -> tuple[pd.DataFrame, pd.DataFrame]:
"""Return (current_list, changes_log) from the S&P 500 Wikipedia page."""
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
req = urllib.request.Request(url, headers=_HEADERS)
with urllib.request.urlopen(req) as resp:
html = resp.read().decode("utf-8")
tables = pd.read_html(io.StringIO(html))
current = tables[0]
changes = tables[1]
changes.columns = [
"_".join(c).strip() if isinstance(c, tuple) else c
for c in changes.columns
]
changes.columns = [
c.replace("Effective Date_Effective Date", "Date") for c in changes.columns
]
return current, changes
def _normalize_ticker(t: str) -> str:
"""Yahoo Finance ticker format: BRK.B → BRK-B."""
return str(t).replace(".", "-").strip()
# ---------------------------------------------------------------------------
# Membership reconstruction
# ---------------------------------------------------------------------------
def build_sp500_history() -> dict[str, list[list[str | None]]]:
"""
Reconstruct per-ticker membership intervals.
Returns
-------
dict: ticker -> list of [start_date, end_date_or_None] pairs.
end_date=None means the ticker is still a member as of today.
Dates are YYYY-MM-DD strings.
Algorithm: start from today's set of members, walk the change log from
newest to oldest. For each event on date D:
- The 'Added' ticker: its current (open) interval starts on D.
Close it: [..., D] — it was NOT a member before D.
- The 'Removed' ticker: it was a member up to D (exclusive).
Open a new interval ending on D (start unknown for now; will be
closed by an earlier event or left open-start).
After the walk, any ticker still "open" (never closed backward) has an
interval reaching back before the earliest logged change.
"""
current, changes = _fetch_sp500_tables()
current_tickers = {_normalize_ticker(s) for s in current["Symbol"].tolist()}
# Parse change log
changes["dt"] = pd.to_datetime(changes["Date"], errors="coerce")
changes = changes.dropna(subset=["dt"]).sort_values("dt", ascending=False)
# For each ticker, collect intervals [start, end].
# We track a "current open interval" per ticker during the backward walk.
# intervals[ticker] = list of [start, end] completed intervals (oldest-first).
# open_start[ticker] = start date of the currently open (most-recent) interval.
intervals: dict[str, list[list[str | None]]] = {}
open_end: dict[str, str | None] = {} # end of currently-open interval
# Initialize: today's members have an open interval ending = None (still in)
for t in current_tickers:
open_end[t] = None # still a member today
intervals[t] = []
# Track the start date of each open interval as we walk backward.
# For a member today, the interval started at the last "Added" event in the
# changes log, OR before the log begins if never added.
# We'll close the interval when we hit the "Added" event going backward.
open_start: dict[str, str | None] = {t: None for t in current_tickers}
for _, row in changes.iterrows():
d = row["dt"].strftime("%Y-%m-%d")
added = row.get("Added_Ticker")
removed = row.get("Removed_Ticker")
if pd.notna(added):
a = _normalize_ticker(added)
# This ticker was added on d → its open interval starts on d.
if a in open_end:
open_start[a] = d
# Finalize the current open interval
intervals[a].append([d, open_end[a]])
# Pop: no further open interval backward in time for this ticker
# (unless 'Removed' opens a new older one below)
del open_end[a]
if pd.notna(removed):
r = _normalize_ticker(removed)
# This ticker was removed on d → it WAS a member before d.
# Open a new interval ending on d (start unknown yet).
if r not in open_end:
intervals.setdefault(r, [])
open_end[r] = d # end of the new older interval
# Any ticker still with an open interval → start predates the log.
# Use the oldest logged date as a conservative "unknown earlier" marker: None.
for t, end in open_end.items():
intervals.setdefault(t, []).append([None, end])
# Sort intervals per ticker oldest→newest
for t, ivs in intervals.items():
ivs.sort(key=lambda iv: (iv[0] or "0000-00-00"))
return intervals
# ---------------------------------------------------------------------------
# Cache I/O
# ---------------------------------------------------------------------------
def _cache_path() -> str:
return os.path.join(CACHE_DIR, "sp500_history.json")
def load_sp500_history(force_refresh: bool = False) -> dict[str, list[list[str | None]]]:
"""Load cached membership history, or rebuild if stale (>1 day old)."""
path = _cache_path()
if not force_refresh and os.path.exists(path):
try:
with open(path) as f:
data = json.load(f)
if data.get("date") == str(date.today()):
return data["intervals"]
except Exception:
pass
print("--- Rebuilding S&P 500 membership history from Wikipedia ---")
intervals = build_sp500_history()
os.makedirs(CACHE_DIR, exist_ok=True)
with open(path, "w") as f:
json.dump({"date": str(date.today()), "intervals": intervals}, f)
print(f"--- Cached {len(intervals)} tickers' membership intervals ---")
return intervals
# ---------------------------------------------------------------------------
# Convert intervals → aligned mask DataFrame
# ---------------------------------------------------------------------------
def membership_mask(dates: pd.DatetimeIndex,
intervals: dict[str, list[list[str | None]]] | None = None,
tickers: list[str] | None = None) -> pd.DataFrame:
"""
Boolean DataFrame: rows = dates, columns = tickers.
True where the ticker was an S&P 500 member on that date.
If `tickers` is given, restrict columns to that list (useful for aligning
with a price DataFrame). Otherwise, include all tickers ever a member.
"""
if intervals is None:
intervals = load_sp500_history()
cols = tickers if tickers is not None else sorted(intervals.keys())
# Tickers not in `intervals` (e.g. SPY, benchmarks, ETFs) are treated as
# always-members so callers can pass the full price matrix through
# mask_prices without zeroing out benchmark series.
mask = pd.DataFrame(False, index=dates, columns=cols)
for t in cols:
if t not in intervals:
mask[t] = True
continue
for start, end in intervals[t]:
s = pd.Timestamp(start) if start else dates[0]
e = pd.Timestamp(end) if end else dates[-1] + pd.Timedelta(days=1)
# Interval semantics: member on [start, end). A ticker removed on
# date D was no longer a member on D.
mask.loc[(mask.index >= s) & (mask.index < e), t] = True
return mask
def all_tickers_ever(intervals: dict | None = None) -> list[str]:
"""All tickers that were ever S&P 500 members (for price data fetching)."""
if intervals is None:
intervals = load_sp500_history()
return sorted(intervals.keys())
def mask_prices(prices: pd.DataFrame,
intervals: dict | None = None) -> pd.DataFrame:
"""
Return a copy of `prices` with NaN set for (date, ticker) pairs where
the ticker was not an S&P 500 member on that date.
This is the key survivorship-bias fix: strategies compute signals from
the masked price data, so they naturally cannot select stocks outside
the point-in-time index membership.
Warm-up note: a newly-added member needs sufficient non-NaN history for
its rolling windows to produce a valid signal. For this codebase's
~252-day lookbacks, a stock becomes "selectable" roughly 1 year after
joining. This is conservative but correct: before that, we have no
legitimate signal anyway.
"""
mask = membership_mask(prices.index, intervals, tickers=list(prices.columns))
return prices.where(mask)