Files
quant/research/fetch_historical.py

153 lines
5.8 KiB
Python

"""
Fetch price history for all tickers that were ever S&P 500 members — including
delisted ones — and save to data/us_pit.csv. This is the foundation for a
survivorship-bias-free backtest.
NOTE: Yahoo Finance no longer serves price data for many fully-delisted tickers
(bankruptcies, old mergers). Those are silently skipped. The result is still
a major improvement over "today's S&P 500 extrapolated 10 years back", but it
is NOT a perfect point-in-time dataset — only a dataset where the universe
mask is correct at each date. A subset of worst-outcome tickers (e.g., ABK,
ACAS) will be missing entirely. This caveat is documented in the run summary.
"""
import os
from datetime import datetime, timedelta
import pandas as pd
import yfinance as yf
import universe_history as uh
DATA_DIR = "data"
OUT_PATH = os.path.join(DATA_DIR, "us_pit.csv")
YEARS = 10
BATCH_SIZE = 50
def _field_out_paths() -> dict[str, str]:
return {
"Close": os.path.join(DATA_DIR, "us_pit_close.csv"),
"High": os.path.join(DATA_DIR, "us_pit_high.csv"),
"Low": os.path.join(DATA_DIR, "us_pit_low.csv"),
"Volume": os.path.join(DATA_DIR, "us_pit_volume.csv"),
}
def fetch_all_historical(force: bool = False) -> pd.DataFrame:
os.makedirs(DATA_DIR, exist_ok=True)
intervals = uh.load_sp500_history()
tickers = uh.all_tickers_ever(intervals) + ["SPY"]
tickers = sorted(set(tickers))
existing = None
if os.path.exists(OUT_PATH) and not force:
existing = pd.read_csv(OUT_PATH, index_col=0, parse_dates=True)
missing = [t for t in tickers if t not in existing.columns]
if not missing:
# Just append latest dates
last_date = existing.index[-1]
if (datetime.now() - last_date.to_pydatetime()).days < 2:
print(f"--- us_pit.csv already up to date: {existing.shape} ---")
return existing
tickers = list(existing.columns)
start = (last_date + timedelta(days=1)).strftime("%Y-%m-%d")
print(f"--- Appending new dates from {start} for {len(tickers)} tickers ---")
new = _download_batched(tickers, start=start)
if new is not None and not new.empty:
combined = pd.concat([existing, new]).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
combined.to_csv(OUT_PATH)
print(f"--- Saved {combined.shape} to {OUT_PATH} ---")
return combined
return existing
else:
print(f"--- Have {existing.shape[1]} cols; need {len(missing)} more ---")
tickers = missing
start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d")
new = _download_batched(tickers, start=start)
if existing is not None and new is not None and not new.empty:
combined = pd.concat([existing, new.reindex(existing.index)], axis=1)
# Add any new rows from `new` not in existing
new_only_idx = new.index.difference(existing.index)
if len(new_only_idx) > 0:
combined_new = new.loc[new_only_idx].reindex(columns=combined.columns)
combined = pd.concat([combined, combined_new]).sort_index()
else:
combined = new
combined.to_csv(OUT_PATH)
print(f"--- Saved {combined.shape} to {OUT_PATH} ---")
return combined
def fetch_all_historical_ohlcv(force: bool = False) -> dict[str, pd.DataFrame]:
os.makedirs(DATA_DIR, exist_ok=True)
intervals = uh.load_sp500_history()
tickers = uh.all_tickers_ever(intervals) + ["SPY"]
tickers = sorted(set(tickers))
start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d")
panels = _download_batched_fields(tickers, start=start, fields=["Close", "High", "Low", "Volume"])
if not panels:
raise RuntimeError("No PIT OHLCV data downloaded")
close = panels["Close"]
close.to_csv(OUT_PATH)
print(f"--- Saved {close.shape} to {OUT_PATH} ---")
result: dict[str, pd.DataFrame] = {"close": close}
for field, path in _field_out_paths().items():
panel = panels[field]
panel.to_csv(path)
print(f"--- Saved {panel.shape} to {path} ---")
result[field.lower()] = panel
return result
def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None:
panels = _download_batched_fields(tickers, start=start, fields=["Close"])
if not panels:
return None
return panels["Close"]
def _download_batched_fields(
tickers: list[str],
start: str,
fields: list[str],
) -> dict[str, pd.DataFrame]:
frames = {field: [] for field in fields}
n = len(tickers)
for i in range(0, n, BATCH_SIZE):
batch = tickers[i:i + BATCH_SIZE]
print(f" [{i}/{n}] fetching {len(batch)} tickers...", flush=True)
try:
raw = yf.download(batch, start=start, auto_adjust=True,
progress=False, threads=True)
if raw.empty:
continue
for field in fields:
if isinstance(raw.columns, pd.MultiIndex):
panel = raw[field]
else:
panel = raw[[field]].rename(columns={field: batch[0]})
panel = panel.dropna(axis=1, how="all")
if not panel.empty:
frames[field].append(panel)
except Exception as e:
print(f" batch failed: {e}")
result = {}
for field, field_frames in frames.items():
if field_frames:
panel = pd.concat(field_frames, axis=1).sort_index()
panel = panel.loc[:, ~panel.columns.duplicated()]
result[field] = panel
else:
result[field] = pd.DataFrame()
return result
if __name__ == "__main__":
fetch_all_historical()