import json import time from pathlib import Path from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen import numpy as np import pandas as pd DEFAULT_SEC_USER_AGENT = "quant-research/0.1 gahow@example.com" DEFAULT_LAG_DAYS = 60 FRAME_SLEEP_SECONDS = 0.2 QUARTERLY_DURATION_CONCEPTS = { "net_income": [("NetIncomeLoss", "USD"), ("ProfitLoss", "USD")], "gross_profit": [("GrossProfit", "USD")], } QUARTERLY_INSTANT_CONCEPTS = { "equity": [ ("StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest", "USD"), ("StockholdersEquity", "USD"), ], "assets": [("Assets", "USD")], "shares": [ ("CommonStockSharesOutstanding", "shares"), ("EntityCommonStockSharesOutstanding", "shares"), ], } def _normalize_ticker(ticker: str) -> str: return ticker.upper().replace(".", "-") def _frame_code(period_end: pd.Timestamp, instant: bool) -> str: quarter = ((period_end.month - 1) // 3) + 1 suffix = "I" if instant else "" return f"CY{period_end.year}Q{quarter}{suffix}" def _cache_dir(data_dir: str) -> Path: path = Path(data_dir) / "sec_frames" path.mkdir(parents=True, exist_ok=True) return path def load_sec_ticker_map(data_dir: str = "data", user_agent: str = DEFAULT_SEC_USER_AGENT) -> pd.DataFrame: cache_path = Path(data_dir) / "sec_company_tickers.json" if cache_path.exists(): raw = json.loads(cache_path.read_text()) else: request = Request( "https://www.sec.gov/files/company_tickers.json", headers={"User-Agent": user_agent, "Accept": "application/json"}, ) with urlopen(request, timeout=30) as response: raw = json.loads(response.read().decode("utf-8")) cache_path.write_text(json.dumps(raw)) rows = [] for item in raw.values(): rows.append( { "ticker": _normalize_ticker(item["ticker"]), "cik": int(item["cik_str"]), "title": item["title"], } ) return pd.DataFrame(rows).drop_duplicates(subset=["ticker"]).sort_values("ticker").reset_index(drop=True) def _load_or_fetch_frame( tag: str, unit: str, frame_code: str, data_dir: str = "data", user_agent: str = DEFAULT_SEC_USER_AGENT, ) -> dict | None: cache_path = _cache_dir(data_dir) / f"{tag}_{unit}_{frame_code}.json" if cache_path.exists(): return json.loads(cache_path.read_text()) url = f"https://data.sec.gov/api/xbrl/frames/us-gaap/{tag}/{unit}/{frame_code}.json" request = Request(url, headers={"User-Agent": user_agent, "Accept": "application/json"}) try: with urlopen(request, timeout=60) as response: payload = json.loads(response.read().decode("utf-8")) except HTTPError as exc: if exc.code == 404: return None raise except URLError: raise cache_path.write_text(json.dumps(payload)) time.sleep(FRAME_SLEEP_SECONDS) return payload def _frame_to_series(payload: dict | None, cik_to_ticker: dict[int, str]) -> pd.Series: if not payload: return pd.Series(dtype=float) frame = pd.DataFrame(payload.get("data", [])) if frame.empty: return pd.Series(dtype=float) frame = frame.loc[frame["cik"].isin(cik_to_ticker)] if frame.empty: return pd.Series(dtype=float) frame["ticker"] = frame["cik"].map(cik_to_ticker) frame = frame.dropna(subset=["ticker", "val"]) frame = frame.sort_values(["ticker", "end"]) series = frame.groupby("ticker")["val"].last() series.index.name = None return series.astype(float) def _combine_quarterly_panels(panels: list[pd.DataFrame]) -> pd.DataFrame: combined = pd.DataFrame() for panel in panels: if panel.empty: continue if combined.empty: combined = panel.copy() continue combined = combined.combine_first(panel) return combined.sort_index() def fetch_sec_quarterly_panels( tickers: list[str], price_index: pd.Index, data_dir: str = "data", user_agent: str = DEFAULT_SEC_USER_AGENT, ) -> dict[str, pd.DataFrame]: normalized_to_original = {_normalize_ticker(t): t for t in tickers} ticker_map = load_sec_ticker_map(data_dir=data_dir, user_agent=user_agent) ticker_map = ticker_map.loc[ticker_map["ticker"].isin(normalized_to_original)] cik_to_ticker = { int(row.cik): normalized_to_original[row.ticker] for row in ticker_map.itertuples(index=False) if row.ticker in normalized_to_original } if not cik_to_ticker: return {name: pd.DataFrame(index=pd.Index([], dtype="datetime64[ns]"), columns=tickers) for name in ( list(QUARTERLY_DURATION_CONCEPTS) + list(QUARTERLY_INSTANT_CONCEPTS) )} min_year = int(price_index.min().year) - 1 max_year = int(price_index.max().year) quarter_ends = [] for year in range(min_year, max_year + 1): for month, day in ((3, 31), (6, 30), (9, 30), (12, 31)): quarter_ends.append(pd.Timestamp(year=year, month=month, day=day)) results: dict[str, list[pd.DataFrame]] = {name: [] for name in QUARTERLY_DURATION_CONCEPTS | QUARTERLY_INSTANT_CONCEPTS} for index, quarter_end in enumerate(quarter_ends, start=1): print(f"--- SEC quarterly frames {index}/{len(quarter_ends)}: {quarter_end.date()} ---") for factor_name, concept_candidates in QUARTERLY_DURATION_CONCEPTS.items(): panel = pd.DataFrame(index=[quarter_end], columns=tickers, dtype=float) for tag, unit in concept_candidates: payload = _load_or_fetch_frame( tag=tag, unit=unit, frame_code=_frame_code(quarter_end, instant=False), data_dir=data_dir, user_agent=user_agent, ) series = _frame_to_series(payload, cik_to_ticker) if not series.empty: for ticker, value in series.items(): if pd.isna(panel.at[quarter_end, ticker]): panel.at[quarter_end, ticker] = value results[factor_name].append(panel) for factor_name, concept_candidates in QUARTERLY_INSTANT_CONCEPTS.items(): panel = pd.DataFrame(index=[quarter_end], columns=tickers, dtype=float) for tag, unit in concept_candidates: payload = _load_or_fetch_frame( tag=tag, unit=unit, frame_code=_frame_code(quarter_end, instant=True), data_dir=data_dir, user_agent=user_agent, ) series = _frame_to_series(payload, cik_to_ticker) if not series.empty: for ticker, value in series.items(): if pd.isna(panel.at[quarter_end, ticker]): panel.at[quarter_end, ticker] = value results[factor_name].append(panel) return {name: _combine_quarterly_panels(panels).reindex(columns=tickers) for name, panels in results.items()} def quarterly_snapshot_to_daily(quarterly_df: pd.DataFrame, daily_index: pd.Index, lag_days: int) -> pd.DataFrame: if quarterly_df.empty: return pd.DataFrame(index=daily_index, columns=quarterly_df.columns, dtype=float) shifted = quarterly_df.copy() shifted.index = pd.DatetimeIndex(shifted.index) + pd.Timedelta(days=lag_days) expanded_index = pd.DatetimeIndex(sorted(set(pd.DatetimeIndex(daily_index)).union(set(shifted.index)))) return shifted.reindex(expanded_index).ffill().reindex(daily_index) def _xsec_rank(df: pd.DataFrame, ascending: bool = True) -> pd.DataFrame: return df.rank(axis=1, pct=True, na_option="keep", ascending=ascending) def build_quarterly_factor_pack( quarterly_data: dict[str, pd.DataFrame], close: pd.DataFrame, lag_days: int = DEFAULT_LAG_DAYS, ) -> dict[str, pd.DataFrame]: daily_index = close.index shares_daily = quarterly_snapshot_to_daily(quarterly_data["shares"], daily_index, lag_days) equity_daily = quarterly_snapshot_to_daily(quarterly_data["equity"], daily_index, lag_days) assets_daily = quarterly_snapshot_to_daily(quarterly_data["assets"], daily_index, lag_days) net_income_ttm = quarterly_data["net_income"].rolling(4, min_periods=4).sum() gross_profit_ttm = quarterly_data["gross_profit"].rolling(4, min_periods=4).sum() assets_yoy = quarterly_data["assets"].shift(4) shares_yoy = quarterly_data["shares"].shift(4) net_income_ttm_daily = quarterly_snapshot_to_daily(net_income_ttm, daily_index, lag_days) gross_profit_ttm_daily = quarterly_snapshot_to_daily(gross_profit_ttm, daily_index, lag_days) assets_yoy_daily = quarterly_snapshot_to_daily(assets_yoy, daily_index, lag_days) shares_yoy_daily = quarterly_snapshot_to_daily(shares_yoy, daily_index, lag_days) market_cap = close * shares_daily book_to_market = equity_daily / market_cap.replace(0.0, np.nan) earnings_yield = net_income_ttm_daily / market_cap.replace(0.0, np.nan) roe = net_income_ttm_daily / equity_daily.replace(0.0, np.nan) gross_profitability = gross_profit_ttm_daily / assets_daily.replace(0.0, np.nan) asset_growth = -(assets_daily / assets_yoy_daily.replace(0.0, np.nan) - 1.0) share_issuance = -(shares_daily / shares_yoy_daily.replace(0.0, np.nan) - 1.0) factor_pack = { "book_to_market": book_to_market, "earnings_yield": earnings_yield, "roe": roe, "gross_profitability": gross_profitability, "asset_growth": asset_growth, "share_issuance": share_issuance, } ranked = { "book_to_market": _xsec_rank(factor_pack["book_to_market"]), "earnings_yield": _xsec_rank(factor_pack["earnings_yield"]), "roe": _xsec_rank(factor_pack["roe"]), "gross_profitability": _xsec_rank(factor_pack["gross_profitability"]), "asset_growth": _xsec_rank(factor_pack["asset_growth"]), "share_issuance": _xsec_rank(factor_pack["share_issuance"]), } factor_pack["composite"] = pd.concat(ranked, axis=1).T.groupby(level=1).mean().T factor_pack["composite"] = factor_pack["composite"].shift(1) return factor_pack def build_exploratory_fundamental_score( close: pd.DataFrame, data_dir: str = "data", lag_days: int = DEFAULT_LAG_DAYS, user_agent: str = DEFAULT_SEC_USER_AGENT, ) -> pd.DataFrame: quarterly = fetch_sec_quarterly_panels( tickers=list(close.columns), price_index=close.index, data_dir=data_dir, user_agent=user_agent, ) return build_quarterly_factor_pack(quarterly, close, lag_days=lag_days)["composite"]