153 lines
5.8 KiB
Python
153 lines
5.8 KiB
Python
"""
|
|
Fetch price history for all tickers that were ever S&P 500 members — including
|
|
delisted ones — and save to data/us_pit.csv. This is the foundation for a
|
|
survivorship-bias-free backtest.
|
|
|
|
NOTE: Yahoo Finance no longer serves price data for many fully-delisted tickers
|
|
(bankruptcies, old mergers). Those are silently skipped. The result is still
|
|
a major improvement over "today's S&P 500 extrapolated 10 years back", but it
|
|
is NOT a perfect point-in-time dataset — only a dataset where the universe
|
|
mask is correct at each date. A subset of worst-outcome tickers (e.g., ABK,
|
|
ACAS) will be missing entirely. This caveat is documented in the run summary.
|
|
"""
|
|
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
|
|
import pandas as pd
|
|
import yfinance as yf
|
|
|
|
import universe_history as uh
|
|
|
|
DATA_DIR = "data"
|
|
OUT_PATH = os.path.join(DATA_DIR, "us_pit.csv")
|
|
YEARS = 10
|
|
BATCH_SIZE = 50
|
|
|
|
|
|
def _field_out_paths() -> dict[str, str]:
|
|
return {
|
|
"Close": os.path.join(DATA_DIR, "us_pit_close.csv"),
|
|
"High": os.path.join(DATA_DIR, "us_pit_high.csv"),
|
|
"Low": os.path.join(DATA_DIR, "us_pit_low.csv"),
|
|
"Volume": os.path.join(DATA_DIR, "us_pit_volume.csv"),
|
|
}
|
|
|
|
|
|
def fetch_all_historical(force: bool = False) -> pd.DataFrame:
|
|
os.makedirs(DATA_DIR, exist_ok=True)
|
|
intervals = uh.load_sp500_history()
|
|
tickers = uh.all_tickers_ever(intervals) + ["SPY"]
|
|
tickers = sorted(set(tickers))
|
|
|
|
existing = None
|
|
if os.path.exists(OUT_PATH) and not force:
|
|
existing = pd.read_csv(OUT_PATH, index_col=0, parse_dates=True)
|
|
missing = [t for t in tickers if t not in existing.columns]
|
|
if not missing:
|
|
# Just append latest dates
|
|
last_date = existing.index[-1]
|
|
if (datetime.now() - last_date.to_pydatetime()).days < 2:
|
|
print(f"--- us_pit.csv already up to date: {existing.shape} ---")
|
|
return existing
|
|
tickers = list(existing.columns)
|
|
start = (last_date + timedelta(days=1)).strftime("%Y-%m-%d")
|
|
print(f"--- Appending new dates from {start} for {len(tickers)} tickers ---")
|
|
new = _download_batched(tickers, start=start)
|
|
if new is not None and not new.empty:
|
|
combined = pd.concat([existing, new]).sort_index()
|
|
combined = combined[~combined.index.duplicated(keep="last")]
|
|
combined.to_csv(OUT_PATH)
|
|
print(f"--- Saved {combined.shape} to {OUT_PATH} ---")
|
|
return combined
|
|
return existing
|
|
else:
|
|
print(f"--- Have {existing.shape[1]} cols; need {len(missing)} more ---")
|
|
tickers = missing
|
|
|
|
start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d")
|
|
new = _download_batched(tickers, start=start)
|
|
|
|
if existing is not None and new is not None and not new.empty:
|
|
combined = pd.concat([existing, new.reindex(existing.index)], axis=1)
|
|
# Add any new rows from `new` not in existing
|
|
new_only_idx = new.index.difference(existing.index)
|
|
if len(new_only_idx) > 0:
|
|
combined_new = new.loc[new_only_idx].reindex(columns=combined.columns)
|
|
combined = pd.concat([combined, combined_new]).sort_index()
|
|
else:
|
|
combined = new
|
|
|
|
combined.to_csv(OUT_PATH)
|
|
print(f"--- Saved {combined.shape} to {OUT_PATH} ---")
|
|
return combined
|
|
|
|
|
|
def fetch_all_historical_ohlcv(force: bool = False) -> dict[str, pd.DataFrame]:
|
|
os.makedirs(DATA_DIR, exist_ok=True)
|
|
intervals = uh.load_sp500_history()
|
|
tickers = uh.all_tickers_ever(intervals) + ["SPY"]
|
|
tickers = sorted(set(tickers))
|
|
start = (datetime.now() - timedelta(days=365 * YEARS)).strftime("%Y-%m-%d")
|
|
panels = _download_batched_fields(tickers, start=start, fields=["Close", "High", "Low", "Volume"])
|
|
if not panels:
|
|
raise RuntimeError("No PIT OHLCV data downloaded")
|
|
|
|
close = panels["Close"]
|
|
close.to_csv(OUT_PATH)
|
|
print(f"--- Saved {close.shape} to {OUT_PATH} ---")
|
|
result: dict[str, pd.DataFrame] = {"close": close}
|
|
for field, path in _field_out_paths().items():
|
|
panel = panels[field]
|
|
panel.to_csv(path)
|
|
print(f"--- Saved {panel.shape} to {path} ---")
|
|
result[field.lower()] = panel
|
|
return result
|
|
|
|
|
|
def _download_batched(tickers: list[str], start: str) -> pd.DataFrame | None:
|
|
panels = _download_batched_fields(tickers, start=start, fields=["Close"])
|
|
if not panels:
|
|
return None
|
|
return panels["Close"]
|
|
|
|
|
|
def _download_batched_fields(
|
|
tickers: list[str],
|
|
start: str,
|
|
fields: list[str],
|
|
) -> dict[str, pd.DataFrame]:
|
|
frames = {field: [] for field in fields}
|
|
n = len(tickers)
|
|
for i in range(0, n, BATCH_SIZE):
|
|
batch = tickers[i:i + BATCH_SIZE]
|
|
print(f" [{i}/{n}] fetching {len(batch)} tickers...", flush=True)
|
|
try:
|
|
raw = yf.download(batch, start=start, auto_adjust=True,
|
|
progress=False, threads=True)
|
|
if raw.empty:
|
|
continue
|
|
for field in fields:
|
|
if isinstance(raw.columns, pd.MultiIndex):
|
|
panel = raw[field]
|
|
else:
|
|
panel = raw[[field]].rename(columns={field: batch[0]})
|
|
panel = panel.dropna(axis=1, how="all")
|
|
if not panel.empty:
|
|
frames[field].append(panel)
|
|
except Exception as e:
|
|
print(f" batch failed: {e}")
|
|
result = {}
|
|
for field, field_frames in frames.items():
|
|
if field_frames:
|
|
panel = pd.concat(field_frames, axis=1).sort_index()
|
|
panel = panel.loc[:, ~panel.columns.duplicated()]
|
|
result[field] = panel
|
|
else:
|
|
result[field] = pd.DataFrame()
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
fetch_all_historical()
|