""" Point-in-time index membership reconstruction — fixes survivorship bias. Approach: Wikipedia's "Selected changes to the list of S&P 500 components" table lists every add/remove event (394 rows back to 1976, as of 2026). We start from today's membership and walk the change log *backward*: - An 'Added' ticker on date D was NOT a member before D. - A 'Removed' ticker on date D WAS a member before D. Applied iteratively, this yields the set of members on any historical date. The membership info is cached in data/sp500_history.json so Wikipedia is hit at most once per day. The cache stores per-ticker membership intervals: { "ticker": [[start, end_or_null], ...] } where dates are YYYY-MM-DD strings. """ import io import json import os import urllib.request from datetime import date, datetime import pandas as pd CACHE_DIR = "data" _HEADERS = {"User-Agent": "Mozilla/5.0 (quant-backtest)"} # --------------------------------------------------------------------------- # Fetch + parse Wikipedia # --------------------------------------------------------------------------- def _fetch_sp500_tables() -> tuple[pd.DataFrame, pd.DataFrame]: """Return (current_list, changes_log) from the S&P 500 Wikipedia page.""" url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" req = urllib.request.Request(url, headers=_HEADERS) with urllib.request.urlopen(req) as resp: html = resp.read().decode("utf-8") tables = pd.read_html(io.StringIO(html)) current = tables[0] changes = tables[1] changes.columns = [ "_".join(c).strip() if isinstance(c, tuple) else c for c in changes.columns ] changes.columns = [ c.replace("Effective Date_Effective Date", "Date") for c in changes.columns ] return current, changes def _normalize_ticker(t: str) -> str: """Yahoo Finance ticker format: BRK.B → BRK-B.""" return str(t).replace(".", "-").strip() # --------------------------------------------------------------------------- # Membership reconstruction # --------------------------------------------------------------------------- def build_sp500_history() -> dict[str, list[list[str | None]]]: """ Reconstruct per-ticker membership intervals. Returns ------- dict: ticker -> list of [start_date, end_date_or_None] pairs. end_date=None means the ticker is still a member as of today. Dates are YYYY-MM-DD strings. Algorithm: start from today's set of members, walk the change log from newest to oldest. For each event on date D: - The 'Added' ticker: its current (open) interval starts on D. Close it: [..., D] — it was NOT a member before D. - The 'Removed' ticker: it was a member up to D (exclusive). Open a new interval ending on D (start unknown for now; will be closed by an earlier event or left open-start). After the walk, any ticker still "open" (never closed backward) has an interval reaching back before the earliest logged change. """ current, changes = _fetch_sp500_tables() current_tickers = {_normalize_ticker(s) for s in current["Symbol"].tolist()} # Parse change log changes["dt"] = pd.to_datetime(changes["Date"], errors="coerce") changes = changes.dropna(subset=["dt"]).sort_values("dt", ascending=False) # For each ticker, collect intervals [start, end]. # We track a "current open interval" per ticker during the backward walk. # intervals[ticker] = list of [start, end] completed intervals (oldest-first). # open_start[ticker] = start date of the currently open (most-recent) interval. intervals: dict[str, list[list[str | None]]] = {} open_end: dict[str, str | None] = {} # end of currently-open interval # Initialize: today's members have an open interval ending = None (still in) for t in current_tickers: open_end[t] = None # still a member today intervals[t] = [] # Track the start date of each open interval as we walk backward. # For a member today, the interval started at the last "Added" event in the # changes log, OR before the log begins if never added. # We'll close the interval when we hit the "Added" event going backward. open_start: dict[str, str | None] = {t: None for t in current_tickers} for _, row in changes.iterrows(): d = row["dt"].strftime("%Y-%m-%d") added = row.get("Added_Ticker") removed = row.get("Removed_Ticker") if pd.notna(added): a = _normalize_ticker(added) # This ticker was added on d → its open interval starts on d. if a in open_end: open_start[a] = d # Finalize the current open interval intervals[a].append([d, open_end[a]]) # Pop: no further open interval backward in time for this ticker # (unless 'Removed' opens a new older one below) del open_end[a] if pd.notna(removed): r = _normalize_ticker(removed) # This ticker was removed on d → it WAS a member before d. # Open a new interval ending on d (start unknown yet). if r not in open_end: intervals.setdefault(r, []) open_end[r] = d # end of the new older interval # Any ticker still with an open interval → start predates the log. # Use the oldest logged date as a conservative "unknown earlier" marker: None. for t, end in open_end.items(): intervals.setdefault(t, []).append([None, end]) # Sort intervals per ticker oldest→newest for t, ivs in intervals.items(): ivs.sort(key=lambda iv: (iv[0] or "0000-00-00")) return intervals # --------------------------------------------------------------------------- # Cache I/O # --------------------------------------------------------------------------- def _cache_path() -> str: return os.path.join(CACHE_DIR, "sp500_history.json") def load_sp500_history(force_refresh: bool = False) -> dict[str, list[list[str | None]]]: """Load cached membership history, or rebuild if stale (>1 day old).""" path = _cache_path() if not force_refresh and os.path.exists(path): try: with open(path) as f: data = json.load(f) if data.get("date") == str(date.today()): return data["intervals"] except Exception: pass print("--- Rebuilding S&P 500 membership history from Wikipedia ---") intervals = build_sp500_history() os.makedirs(CACHE_DIR, exist_ok=True) with open(path, "w") as f: json.dump({"date": str(date.today()), "intervals": intervals}, f) print(f"--- Cached {len(intervals)} tickers' membership intervals ---") return intervals # --------------------------------------------------------------------------- # Convert intervals → aligned mask DataFrame # --------------------------------------------------------------------------- def membership_mask(dates: pd.DatetimeIndex, intervals: dict[str, list[list[str | None]]] | None = None, tickers: list[str] | None = None) -> pd.DataFrame: """ Boolean DataFrame: rows = dates, columns = tickers. True where the ticker was an S&P 500 member on that date. If `tickers` is given, restrict columns to that list (useful for aligning with a price DataFrame). Otherwise, include all tickers ever a member. """ if intervals is None: intervals = load_sp500_history() cols = tickers if tickers is not None else sorted(intervals.keys()) # Tickers not in `intervals` (e.g. SPY, benchmarks, ETFs) are treated as # always-members so callers can pass the full price matrix through # mask_prices without zeroing out benchmark series. mask = pd.DataFrame(False, index=dates, columns=cols) for t in cols: if t not in intervals: mask[t] = True continue for start, end in intervals[t]: s = pd.Timestamp(start) if start else dates[0] e = pd.Timestamp(end) if end else dates[-1] + pd.Timedelta(days=1) # Interval semantics: member on [start, end). A ticker removed on # date D was no longer a member on D. mask.loc[(mask.index >= s) & (mask.index < e), t] = True return mask def all_tickers_ever(intervals: dict | None = None) -> list[str]: """All tickers that were ever S&P 500 members (for price data fetching).""" if intervals is None: intervals = load_sp500_history() return sorted(intervals.keys()) def mask_prices(prices: pd.DataFrame, intervals: dict | None = None) -> pd.DataFrame: """ Return a copy of `prices` with NaN set for (date, ticker) pairs where the ticker was not an S&P 500 member on that date. This is the key survivorship-bias fix: strategies compute signals from the masked price data, so they naturally cannot select stocks outside the point-in-time index membership. Warm-up note: a newly-added member needs sufficient non-NaN history for its rolling windows to produce a valid signal. For this codebase's ~252-day lookbacks, a stock becomes "selectable" roughly 1 year after joining. This is conservative but correct: before that, we have no legitimate signal anyway. """ mask = membership_mask(prices.index, intervals, tickers=list(prices.columns)) return prices.where(mask)