Add 28 research scripts covering DCA simulation, momentum evaluation, Sharpe optimization, trend rider analysis, and US fundamentals exploration.
274 lines
11 KiB
Python
274 lines
11 KiB
Python
import json
|
|
import time
|
|
from pathlib import Path
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
|
|
DEFAULT_SEC_USER_AGENT = "quant-research/0.1 gahow@example.com"
|
|
DEFAULT_LAG_DAYS = 60
|
|
FRAME_SLEEP_SECONDS = 0.2
|
|
|
|
QUARTERLY_DURATION_CONCEPTS = {
|
|
"net_income": [("NetIncomeLoss", "USD"), ("ProfitLoss", "USD")],
|
|
"gross_profit": [("GrossProfit", "USD")],
|
|
}
|
|
|
|
QUARTERLY_INSTANT_CONCEPTS = {
|
|
"equity": [
|
|
("StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest", "USD"),
|
|
("StockholdersEquity", "USD"),
|
|
],
|
|
"assets": [("Assets", "USD")],
|
|
"shares": [
|
|
("CommonStockSharesOutstanding", "shares"),
|
|
("EntityCommonStockSharesOutstanding", "shares"),
|
|
],
|
|
}
|
|
|
|
|
|
def _normalize_ticker(ticker: str) -> str:
|
|
return ticker.upper().replace(".", "-")
|
|
|
|
|
|
def _frame_code(period_end: pd.Timestamp, instant: bool) -> str:
|
|
quarter = ((period_end.month - 1) // 3) + 1
|
|
suffix = "I" if instant else ""
|
|
return f"CY{period_end.year}Q{quarter}{suffix}"
|
|
|
|
|
|
def _cache_dir(data_dir: str) -> Path:
|
|
path = Path(data_dir) / "sec_frames"
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def load_sec_ticker_map(data_dir: str = "data", user_agent: str = DEFAULT_SEC_USER_AGENT) -> pd.DataFrame:
|
|
cache_path = Path(data_dir) / "sec_company_tickers.json"
|
|
if cache_path.exists():
|
|
raw = json.loads(cache_path.read_text())
|
|
else:
|
|
request = Request(
|
|
"https://www.sec.gov/files/company_tickers.json",
|
|
headers={"User-Agent": user_agent, "Accept": "application/json"},
|
|
)
|
|
with urlopen(request, timeout=30) as response:
|
|
raw = json.loads(response.read().decode("utf-8"))
|
|
cache_path.write_text(json.dumps(raw))
|
|
|
|
rows = []
|
|
for item in raw.values():
|
|
rows.append(
|
|
{
|
|
"ticker": _normalize_ticker(item["ticker"]),
|
|
"cik": int(item["cik_str"]),
|
|
"title": item["title"],
|
|
}
|
|
)
|
|
return pd.DataFrame(rows).drop_duplicates(subset=["ticker"]).sort_values("ticker").reset_index(drop=True)
|
|
|
|
|
|
def _load_or_fetch_frame(
|
|
tag: str,
|
|
unit: str,
|
|
frame_code: str,
|
|
data_dir: str = "data",
|
|
user_agent: str = DEFAULT_SEC_USER_AGENT,
|
|
) -> dict | None:
|
|
cache_path = _cache_dir(data_dir) / f"{tag}_{unit}_{frame_code}.json"
|
|
if cache_path.exists():
|
|
return json.loads(cache_path.read_text())
|
|
|
|
url = f"https://data.sec.gov/api/xbrl/frames/us-gaap/{tag}/{unit}/{frame_code}.json"
|
|
request = Request(url, headers={"User-Agent": user_agent, "Accept": "application/json"})
|
|
try:
|
|
with urlopen(request, timeout=60) as response:
|
|
payload = json.loads(response.read().decode("utf-8"))
|
|
except HTTPError as exc:
|
|
if exc.code == 404:
|
|
return None
|
|
raise
|
|
except URLError:
|
|
raise
|
|
|
|
cache_path.write_text(json.dumps(payload))
|
|
time.sleep(FRAME_SLEEP_SECONDS)
|
|
return payload
|
|
|
|
|
|
def _frame_to_series(payload: dict | None, cik_to_ticker: dict[int, str]) -> pd.Series:
|
|
if not payload:
|
|
return pd.Series(dtype=float)
|
|
frame = pd.DataFrame(payload.get("data", []))
|
|
if frame.empty:
|
|
return pd.Series(dtype=float)
|
|
|
|
frame = frame.loc[frame["cik"].isin(cik_to_ticker)]
|
|
if frame.empty:
|
|
return pd.Series(dtype=float)
|
|
|
|
frame["ticker"] = frame["cik"].map(cik_to_ticker)
|
|
frame = frame.dropna(subset=["ticker", "val"])
|
|
frame = frame.sort_values(["ticker", "end"])
|
|
series = frame.groupby("ticker")["val"].last()
|
|
series.index.name = None
|
|
return series.astype(float)
|
|
|
|
|
|
def _combine_quarterly_panels(panels: list[pd.DataFrame]) -> pd.DataFrame:
|
|
combined = pd.DataFrame()
|
|
for panel in panels:
|
|
if panel.empty:
|
|
continue
|
|
if combined.empty:
|
|
combined = panel.copy()
|
|
continue
|
|
combined = combined.combine_first(panel)
|
|
return combined.sort_index()
|
|
|
|
|
|
def fetch_sec_quarterly_panels(
|
|
tickers: list[str],
|
|
price_index: pd.Index,
|
|
data_dir: str = "data",
|
|
user_agent: str = DEFAULT_SEC_USER_AGENT,
|
|
) -> dict[str, pd.DataFrame]:
|
|
normalized_to_original = {_normalize_ticker(t): t for t in tickers}
|
|
ticker_map = load_sec_ticker_map(data_dir=data_dir, user_agent=user_agent)
|
|
ticker_map = ticker_map.loc[ticker_map["ticker"].isin(normalized_to_original)]
|
|
cik_to_ticker = {
|
|
int(row.cik): normalized_to_original[row.ticker]
|
|
for row in ticker_map.itertuples(index=False)
|
|
if row.ticker in normalized_to_original
|
|
}
|
|
if not cik_to_ticker:
|
|
return {name: pd.DataFrame(index=pd.Index([], dtype="datetime64[ns]"), columns=tickers) for name in (
|
|
list(QUARTERLY_DURATION_CONCEPTS) + list(QUARTERLY_INSTANT_CONCEPTS)
|
|
)}
|
|
|
|
min_year = int(price_index.min().year) - 1
|
|
max_year = int(price_index.max().year)
|
|
quarter_ends = []
|
|
for year in range(min_year, max_year + 1):
|
|
for month, day in ((3, 31), (6, 30), (9, 30), (12, 31)):
|
|
quarter_ends.append(pd.Timestamp(year=year, month=month, day=day))
|
|
|
|
results: dict[str, list[pd.DataFrame]] = {name: [] for name in QUARTERLY_DURATION_CONCEPTS | QUARTERLY_INSTANT_CONCEPTS}
|
|
for index, quarter_end in enumerate(quarter_ends, start=1):
|
|
print(f"--- SEC quarterly frames {index}/{len(quarter_ends)}: {quarter_end.date()} ---")
|
|
for factor_name, concept_candidates in QUARTERLY_DURATION_CONCEPTS.items():
|
|
panel = pd.DataFrame(index=[quarter_end], columns=tickers, dtype=float)
|
|
for tag, unit in concept_candidates:
|
|
payload = _load_or_fetch_frame(
|
|
tag=tag,
|
|
unit=unit,
|
|
frame_code=_frame_code(quarter_end, instant=False),
|
|
data_dir=data_dir,
|
|
user_agent=user_agent,
|
|
)
|
|
series = _frame_to_series(payload, cik_to_ticker)
|
|
if not series.empty:
|
|
for ticker, value in series.items():
|
|
if pd.isna(panel.at[quarter_end, ticker]):
|
|
panel.at[quarter_end, ticker] = value
|
|
results[factor_name].append(panel)
|
|
|
|
for factor_name, concept_candidates in QUARTERLY_INSTANT_CONCEPTS.items():
|
|
panel = pd.DataFrame(index=[quarter_end], columns=tickers, dtype=float)
|
|
for tag, unit in concept_candidates:
|
|
payload = _load_or_fetch_frame(
|
|
tag=tag,
|
|
unit=unit,
|
|
frame_code=_frame_code(quarter_end, instant=True),
|
|
data_dir=data_dir,
|
|
user_agent=user_agent,
|
|
)
|
|
series = _frame_to_series(payload, cik_to_ticker)
|
|
if not series.empty:
|
|
for ticker, value in series.items():
|
|
if pd.isna(panel.at[quarter_end, ticker]):
|
|
panel.at[quarter_end, ticker] = value
|
|
results[factor_name].append(panel)
|
|
|
|
return {name: _combine_quarterly_panels(panels).reindex(columns=tickers) for name, panels in results.items()}
|
|
|
|
|
|
def quarterly_snapshot_to_daily(quarterly_df: pd.DataFrame, daily_index: pd.Index, lag_days: int) -> pd.DataFrame:
|
|
if quarterly_df.empty:
|
|
return pd.DataFrame(index=daily_index, columns=quarterly_df.columns, dtype=float)
|
|
shifted = quarterly_df.copy()
|
|
shifted.index = pd.DatetimeIndex(shifted.index) + pd.Timedelta(days=lag_days)
|
|
expanded_index = pd.DatetimeIndex(sorted(set(pd.DatetimeIndex(daily_index)).union(set(shifted.index))))
|
|
return shifted.reindex(expanded_index).ffill().reindex(daily_index)
|
|
|
|
|
|
def _xsec_rank(df: pd.DataFrame, ascending: bool = True) -> pd.DataFrame:
|
|
return df.rank(axis=1, pct=True, na_option="keep", ascending=ascending)
|
|
|
|
|
|
def build_quarterly_factor_pack(
|
|
quarterly_data: dict[str, pd.DataFrame],
|
|
close: pd.DataFrame,
|
|
lag_days: int = DEFAULT_LAG_DAYS,
|
|
) -> dict[str, pd.DataFrame]:
|
|
daily_index = close.index
|
|
shares_daily = quarterly_snapshot_to_daily(quarterly_data["shares"], daily_index, lag_days)
|
|
equity_daily = quarterly_snapshot_to_daily(quarterly_data["equity"], daily_index, lag_days)
|
|
assets_daily = quarterly_snapshot_to_daily(quarterly_data["assets"], daily_index, lag_days)
|
|
|
|
net_income_ttm = quarterly_data["net_income"].rolling(4, min_periods=4).sum()
|
|
gross_profit_ttm = quarterly_data["gross_profit"].rolling(4, min_periods=4).sum()
|
|
assets_yoy = quarterly_data["assets"].shift(4)
|
|
shares_yoy = quarterly_data["shares"].shift(4)
|
|
|
|
net_income_ttm_daily = quarterly_snapshot_to_daily(net_income_ttm, daily_index, lag_days)
|
|
gross_profit_ttm_daily = quarterly_snapshot_to_daily(gross_profit_ttm, daily_index, lag_days)
|
|
assets_yoy_daily = quarterly_snapshot_to_daily(assets_yoy, daily_index, lag_days)
|
|
shares_yoy_daily = quarterly_snapshot_to_daily(shares_yoy, daily_index, lag_days)
|
|
|
|
market_cap = close * shares_daily
|
|
book_to_market = equity_daily / market_cap.replace(0.0, np.nan)
|
|
earnings_yield = net_income_ttm_daily / market_cap.replace(0.0, np.nan)
|
|
roe = net_income_ttm_daily / equity_daily.replace(0.0, np.nan)
|
|
gross_profitability = gross_profit_ttm_daily / assets_daily.replace(0.0, np.nan)
|
|
asset_growth = -(assets_daily / assets_yoy_daily.replace(0.0, np.nan) - 1.0)
|
|
share_issuance = -(shares_daily / shares_yoy_daily.replace(0.0, np.nan) - 1.0)
|
|
|
|
factor_pack = {
|
|
"book_to_market": book_to_market,
|
|
"earnings_yield": earnings_yield,
|
|
"roe": roe,
|
|
"gross_profitability": gross_profitability,
|
|
"asset_growth": asset_growth,
|
|
"share_issuance": share_issuance,
|
|
}
|
|
ranked = {
|
|
"book_to_market": _xsec_rank(factor_pack["book_to_market"]),
|
|
"earnings_yield": _xsec_rank(factor_pack["earnings_yield"]),
|
|
"roe": _xsec_rank(factor_pack["roe"]),
|
|
"gross_profitability": _xsec_rank(factor_pack["gross_profitability"]),
|
|
"asset_growth": _xsec_rank(factor_pack["asset_growth"]),
|
|
"share_issuance": _xsec_rank(factor_pack["share_issuance"]),
|
|
}
|
|
factor_pack["composite"] = pd.concat(ranked, axis=1).T.groupby(level=1).mean().T
|
|
factor_pack["composite"] = factor_pack["composite"].shift(1)
|
|
return factor_pack
|
|
|
|
|
|
def build_exploratory_fundamental_score(
|
|
close: pd.DataFrame,
|
|
data_dir: str = "data",
|
|
lag_days: int = DEFAULT_LAG_DAYS,
|
|
user_agent: str = DEFAULT_SEC_USER_AGENT,
|
|
) -> pd.DataFrame:
|
|
quarterly = fetch_sec_quarterly_panels(
|
|
tickers=list(close.columns),
|
|
price_index=close.index,
|
|
data_dir=data_dir,
|
|
user_agent=user_agent,
|
|
)
|
|
return build_quarterly_factor_pack(quarterly, close, lag_days=lag_days)["composite"]
|