Files
quant/research/us_fundamentals.py
Gahow Wang 541f7bcf5b research: add strategy evaluation and exploration scripts
Add 28 research scripts covering DCA simulation, momentum evaluation,
Sharpe optimization, trend rider analysis, and US fundamentals exploration.
2026-05-14 12:54:08 +08:00

274 lines
11 KiB
Python

import json
import time
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
import numpy as np
import pandas as pd
DEFAULT_SEC_USER_AGENT = "quant-research/0.1 gahow@example.com"
DEFAULT_LAG_DAYS = 60
FRAME_SLEEP_SECONDS = 0.2
QUARTERLY_DURATION_CONCEPTS = {
"net_income": [("NetIncomeLoss", "USD"), ("ProfitLoss", "USD")],
"gross_profit": [("GrossProfit", "USD")],
}
QUARTERLY_INSTANT_CONCEPTS = {
"equity": [
("StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest", "USD"),
("StockholdersEquity", "USD"),
],
"assets": [("Assets", "USD")],
"shares": [
("CommonStockSharesOutstanding", "shares"),
("EntityCommonStockSharesOutstanding", "shares"),
],
}
def _normalize_ticker(ticker: str) -> str:
return ticker.upper().replace(".", "-")
def _frame_code(period_end: pd.Timestamp, instant: bool) -> str:
quarter = ((period_end.month - 1) // 3) + 1
suffix = "I" if instant else ""
return f"CY{period_end.year}Q{quarter}{suffix}"
def _cache_dir(data_dir: str) -> Path:
path = Path(data_dir) / "sec_frames"
path.mkdir(parents=True, exist_ok=True)
return path
def load_sec_ticker_map(data_dir: str = "data", user_agent: str = DEFAULT_SEC_USER_AGENT) -> pd.DataFrame:
cache_path = Path(data_dir) / "sec_company_tickers.json"
if cache_path.exists():
raw = json.loads(cache_path.read_text())
else:
request = Request(
"https://www.sec.gov/files/company_tickers.json",
headers={"User-Agent": user_agent, "Accept": "application/json"},
)
with urlopen(request, timeout=30) as response:
raw = json.loads(response.read().decode("utf-8"))
cache_path.write_text(json.dumps(raw))
rows = []
for item in raw.values():
rows.append(
{
"ticker": _normalize_ticker(item["ticker"]),
"cik": int(item["cik_str"]),
"title": item["title"],
}
)
return pd.DataFrame(rows).drop_duplicates(subset=["ticker"]).sort_values("ticker").reset_index(drop=True)
def _load_or_fetch_frame(
tag: str,
unit: str,
frame_code: str,
data_dir: str = "data",
user_agent: str = DEFAULT_SEC_USER_AGENT,
) -> dict | None:
cache_path = _cache_dir(data_dir) / f"{tag}_{unit}_{frame_code}.json"
if cache_path.exists():
return json.loads(cache_path.read_text())
url = f"https://data.sec.gov/api/xbrl/frames/us-gaap/{tag}/{unit}/{frame_code}.json"
request = Request(url, headers={"User-Agent": user_agent, "Accept": "application/json"})
try:
with urlopen(request, timeout=60) as response:
payload = json.loads(response.read().decode("utf-8"))
except HTTPError as exc:
if exc.code == 404:
return None
raise
except URLError:
raise
cache_path.write_text(json.dumps(payload))
time.sleep(FRAME_SLEEP_SECONDS)
return payload
def _frame_to_series(payload: dict | None, cik_to_ticker: dict[int, str]) -> pd.Series:
if not payload:
return pd.Series(dtype=float)
frame = pd.DataFrame(payload.get("data", []))
if frame.empty:
return pd.Series(dtype=float)
frame = frame.loc[frame["cik"].isin(cik_to_ticker)]
if frame.empty:
return pd.Series(dtype=float)
frame["ticker"] = frame["cik"].map(cik_to_ticker)
frame = frame.dropna(subset=["ticker", "val"])
frame = frame.sort_values(["ticker", "end"])
series = frame.groupby("ticker")["val"].last()
series.index.name = None
return series.astype(float)
def _combine_quarterly_panels(panels: list[pd.DataFrame]) -> pd.DataFrame:
combined = pd.DataFrame()
for panel in panels:
if panel.empty:
continue
if combined.empty:
combined = panel.copy()
continue
combined = combined.combine_first(panel)
return combined.sort_index()
def fetch_sec_quarterly_panels(
tickers: list[str],
price_index: pd.Index,
data_dir: str = "data",
user_agent: str = DEFAULT_SEC_USER_AGENT,
) -> dict[str, pd.DataFrame]:
normalized_to_original = {_normalize_ticker(t): t for t in tickers}
ticker_map = load_sec_ticker_map(data_dir=data_dir, user_agent=user_agent)
ticker_map = ticker_map.loc[ticker_map["ticker"].isin(normalized_to_original)]
cik_to_ticker = {
int(row.cik): normalized_to_original[row.ticker]
for row in ticker_map.itertuples(index=False)
if row.ticker in normalized_to_original
}
if not cik_to_ticker:
return {name: pd.DataFrame(index=pd.Index([], dtype="datetime64[ns]"), columns=tickers) for name in (
list(QUARTERLY_DURATION_CONCEPTS) + list(QUARTERLY_INSTANT_CONCEPTS)
)}
min_year = int(price_index.min().year) - 1
max_year = int(price_index.max().year)
quarter_ends = []
for year in range(min_year, max_year + 1):
for month, day in ((3, 31), (6, 30), (9, 30), (12, 31)):
quarter_ends.append(pd.Timestamp(year=year, month=month, day=day))
results: dict[str, list[pd.DataFrame]] = {name: [] for name in QUARTERLY_DURATION_CONCEPTS | QUARTERLY_INSTANT_CONCEPTS}
for index, quarter_end in enumerate(quarter_ends, start=1):
print(f"--- SEC quarterly frames {index}/{len(quarter_ends)}: {quarter_end.date()} ---")
for factor_name, concept_candidates in QUARTERLY_DURATION_CONCEPTS.items():
panel = pd.DataFrame(index=[quarter_end], columns=tickers, dtype=float)
for tag, unit in concept_candidates:
payload = _load_or_fetch_frame(
tag=tag,
unit=unit,
frame_code=_frame_code(quarter_end, instant=False),
data_dir=data_dir,
user_agent=user_agent,
)
series = _frame_to_series(payload, cik_to_ticker)
if not series.empty:
for ticker, value in series.items():
if pd.isna(panel.at[quarter_end, ticker]):
panel.at[quarter_end, ticker] = value
results[factor_name].append(panel)
for factor_name, concept_candidates in QUARTERLY_INSTANT_CONCEPTS.items():
panel = pd.DataFrame(index=[quarter_end], columns=tickers, dtype=float)
for tag, unit in concept_candidates:
payload = _load_or_fetch_frame(
tag=tag,
unit=unit,
frame_code=_frame_code(quarter_end, instant=True),
data_dir=data_dir,
user_agent=user_agent,
)
series = _frame_to_series(payload, cik_to_ticker)
if not series.empty:
for ticker, value in series.items():
if pd.isna(panel.at[quarter_end, ticker]):
panel.at[quarter_end, ticker] = value
results[factor_name].append(panel)
return {name: _combine_quarterly_panels(panels).reindex(columns=tickers) for name, panels in results.items()}
def quarterly_snapshot_to_daily(quarterly_df: pd.DataFrame, daily_index: pd.Index, lag_days: int) -> pd.DataFrame:
if quarterly_df.empty:
return pd.DataFrame(index=daily_index, columns=quarterly_df.columns, dtype=float)
shifted = quarterly_df.copy()
shifted.index = pd.DatetimeIndex(shifted.index) + pd.Timedelta(days=lag_days)
expanded_index = pd.DatetimeIndex(sorted(set(pd.DatetimeIndex(daily_index)).union(set(shifted.index))))
return shifted.reindex(expanded_index).ffill().reindex(daily_index)
def _xsec_rank(df: pd.DataFrame, ascending: bool = True) -> pd.DataFrame:
return df.rank(axis=1, pct=True, na_option="keep", ascending=ascending)
def build_quarterly_factor_pack(
quarterly_data: dict[str, pd.DataFrame],
close: pd.DataFrame,
lag_days: int = DEFAULT_LAG_DAYS,
) -> dict[str, pd.DataFrame]:
daily_index = close.index
shares_daily = quarterly_snapshot_to_daily(quarterly_data["shares"], daily_index, lag_days)
equity_daily = quarterly_snapshot_to_daily(quarterly_data["equity"], daily_index, lag_days)
assets_daily = quarterly_snapshot_to_daily(quarterly_data["assets"], daily_index, lag_days)
net_income_ttm = quarterly_data["net_income"].rolling(4, min_periods=4).sum()
gross_profit_ttm = quarterly_data["gross_profit"].rolling(4, min_periods=4).sum()
assets_yoy = quarterly_data["assets"].shift(4)
shares_yoy = quarterly_data["shares"].shift(4)
net_income_ttm_daily = quarterly_snapshot_to_daily(net_income_ttm, daily_index, lag_days)
gross_profit_ttm_daily = quarterly_snapshot_to_daily(gross_profit_ttm, daily_index, lag_days)
assets_yoy_daily = quarterly_snapshot_to_daily(assets_yoy, daily_index, lag_days)
shares_yoy_daily = quarterly_snapshot_to_daily(shares_yoy, daily_index, lag_days)
market_cap = close * shares_daily
book_to_market = equity_daily / market_cap.replace(0.0, np.nan)
earnings_yield = net_income_ttm_daily / market_cap.replace(0.0, np.nan)
roe = net_income_ttm_daily / equity_daily.replace(0.0, np.nan)
gross_profitability = gross_profit_ttm_daily / assets_daily.replace(0.0, np.nan)
asset_growth = -(assets_daily / assets_yoy_daily.replace(0.0, np.nan) - 1.0)
share_issuance = -(shares_daily / shares_yoy_daily.replace(0.0, np.nan) - 1.0)
factor_pack = {
"book_to_market": book_to_market,
"earnings_yield": earnings_yield,
"roe": roe,
"gross_profitability": gross_profitability,
"asset_growth": asset_growth,
"share_issuance": share_issuance,
}
ranked = {
"book_to_market": _xsec_rank(factor_pack["book_to_market"]),
"earnings_yield": _xsec_rank(factor_pack["earnings_yield"]),
"roe": _xsec_rank(factor_pack["roe"]),
"gross_profitability": _xsec_rank(factor_pack["gross_profitability"]),
"asset_growth": _xsec_rank(factor_pack["asset_growth"]),
"share_issuance": _xsec_rank(factor_pack["share_issuance"]),
}
factor_pack["composite"] = pd.concat(ranked, axis=1).T.groupby(level=1).mean().T
factor_pack["composite"] = factor_pack["composite"].shift(1)
return factor_pack
def build_exploratory_fundamental_score(
close: pd.DataFrame,
data_dir: str = "data",
lag_days: int = DEFAULT_LAG_DAYS,
user_agent: str = DEFAULT_SEC_USER_AGENT,
) -> pd.DataFrame:
quarterly = fetch_sec_quarterly_panels(
tickers=list(close.columns),
price_index=close.index,
data_dir=data_dir,
user_agent=user_agent,
)
return build_quarterly_factor_pack(quarterly, close, lag_days=lag_days)["composite"]