Files
quant/factor_attribution.py

358 lines
12 KiB
Python

from __future__ import annotations
import http.client
import io
import socket
import ssl
import warnings
import zipfile
from pathlib import Path
from urllib.error import URLError
from urllib.request import Request, urlopen
import numpy as np
import pandas as pd
from scipy import stats
KEN_FRENCH_DAILY_FF5_ZIP_URL = (
"https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/"
"F-F_Research_Data_5_Factors_2x3_daily_CSV.zip"
)
EXPECTED_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"]
CAPM_FACTOR_COLUMNS = ["MKT_RF"]
FF5_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA"]
EXTENSION_FACTOR_COLUMNS = ["MOM", "LOWVOL", "RECOVERY"]
FF5PLUS_FACTOR_COLUMNS = FF5_FACTOR_COLUMNS + EXTENSION_FACTOR_COLUMNS
PROXY_FACTOR_COLUMNS = [
"MKT",
"SMB_PROXY",
"HML_PROXY",
"RMW_PROXY",
"CMA_PROXY",
] + EXTENSION_FACTOR_COLUMNS
TRADING_DAYS_PER_YEAR = 252
class ExternalFactorFormatError(ValueError):
pass
class ExternalFactorDownloadError(OSError):
pass
def _download_kf_zip_bytes() -> bytes:
request = Request(
KEN_FRENCH_DAILY_FF5_ZIP_URL,
headers={"User-Agent": "quant-factor-attribution/0.1"},
)
try:
with urlopen(request, timeout=30) as response:
return response.read()
except (
URLError,
TimeoutError,
ConnectionError,
socket.timeout,
socket.gaierror,
ssl.SSLError,
http.client.HTTPException,
http.client.IncompleteRead,
http.client.RemoteDisconnected,
) as exc:
raise ExternalFactorDownloadError(f"Failed to download external factor data: {exc}") from exc
def _parse_kf_daily_csv(raw_bytes: bytes) -> pd.DataFrame:
with zipfile.ZipFile(io.BytesIO(raw_bytes)) as archive:
member_names = [
name
for name in archive.namelist()
if not name.endswith("/") and name.lower().endswith((".csv", ".txt"))
]
if not member_names:
raise ExternalFactorFormatError("Ken French archive did not contain a CSV or TXT file")
try:
text = archive.read(member_names[0]).decode("utf-8-sig")
except UnicodeDecodeError as exc:
raise ExternalFactorFormatError("Ken French factor file was not valid UTF-8 text") from exc
lines = [line for line in text.splitlines() if line.strip()]
try:
header_index = next(i for i, line in enumerate(lines) if "Mkt-RF" in line)
except StopIteration as exc:
raise ExternalFactorFormatError("Ken French factor file was missing the daily factor header") from exc
table = "\n".join(lines[header_index:])
try:
factors = pd.read_csv(io.StringIO(table))
except pd.errors.ParserError as exc:
raise ExternalFactorFormatError("Ken French factor table could not be parsed") from exc
factors = factors.rename(columns={"Mkt-RF": "MKT_RF"})
date_column = factors.columns[0]
missing_columns = [column for column in EXPECTED_FACTOR_COLUMNS if column not in factors.columns]
if missing_columns:
raise ExternalFactorFormatError(
f"Ken French factor table was missing columns: {', '.join(missing_columns)}"
)
factors = factors[factors[date_column].astype(str).str.fullmatch(r"\d{8}")]
if factors.empty:
raise ExternalFactorFormatError("Ken French factor table did not contain daily rows")
try:
factors[date_column] = pd.to_datetime(factors[date_column], format="%Y%m%d")
except ValueError as exc:
raise ExternalFactorFormatError("Ken French factor table contained invalid dates") from exc
factors = factors.set_index(date_column)
factors.index.name = None
try:
factors = factors[EXPECTED_FACTOR_COLUMNS].astype(float) / 100.0
except ValueError as exc:
raise ExternalFactorFormatError("Ken French factor table contained non-numeric values") from exc
return factors
def _warn_and_load_cached_factors(cache_path: Path, reason: str) -> pd.DataFrame:
warnings.warn(
f"Using cached data from {cache_path} because {reason}.",
UserWarning,
stacklevel=2,
)
return pd.read_csv(cache_path, index_col=0, parse_dates=True)
def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataFrame:
cache_path = Path(cache_dir) / "ff5_us_daily.csv"
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
raw_bytes = _download_kf_zip_bytes()
except ExternalFactorDownloadError as exc:
if cache_path.exists():
return _warn_and_load_cached_factors(cache_path, f"download failed: {exc}")
raise
try:
factors = _parse_kf_daily_csv(raw_bytes)
except zipfile.BadZipFile as exc:
if cache_path.exists():
return _warn_and_load_cached_factors(cache_path, f"the upstream ZIP was invalid: {exc}")
raise
except ExternalFactorFormatError as exc:
if cache_path.exists():
return _warn_and_load_cached_factors(
cache_path,
f"the upstream factor format was invalid: {exc}",
)
raise
factors.to_csv(cache_path)
return factors
def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame:
stocks = price_data.drop(columns=[benchmark], errors="ignore")
return stocks.sort_index().astype(float)
def _long_short_factor(
scores: pd.DataFrame,
returns: pd.DataFrame,
quantile: float = 0.3,
) -> pd.Series:
lagged_scores = scores.shift(1)
high_cutoff = lagged_scores.quantile(1 - quantile, axis=1)
low_cutoff = lagged_scores.quantile(quantile, axis=1)
long_mask = lagged_scores.ge(high_cutoff, axis=0)
short_mask = lagged_scores.le(low_cutoff, axis=0)
long_returns = returns.where(long_mask).mean(axis=1)
short_returns = returns.where(short_mask).mean(axis=1)
return (long_returns - short_returns).rename(None)
def build_extension_factors(
price_data: pd.DataFrame,
benchmark: str,
market: str,
) -> pd.DataFrame:
del market
stocks = _select_stock_prices(price_data, benchmark)
returns = stocks.pct_change()
momentum_scores = stocks.shift(21).pct_change(231)
low_vol_scores = -returns.rolling(60, min_periods=60).std()
recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0
return pd.DataFrame(
{
"MOM": _long_short_factor(momentum_scores, returns),
"LOWVOL": _long_short_factor(low_vol_scores, returns),
"RECOVERY": _long_short_factor(recovery_scores, returns),
},
index=price_data.index,
)
def _positive_share(values: np.ndarray) -> float:
return float(np.mean(values > 0))
def build_proxy_core_factors(
price_data: pd.DataFrame,
benchmark: str,
market: str,
) -> pd.DataFrame:
del market
stocks = _select_stock_prices(price_data, benchmark)
returns = stocks.pct_change()
if benchmark in price_data:
market_factor = price_data[benchmark].astype(float).pct_change()
else:
market_factor = returns.mean(axis=1)
inverse_price_scores = -stocks
value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0)
profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True)
investment_proxy_scores = -stocks.pct_change(126)
return pd.DataFrame(
{
"MKT": market_factor,
"SMB_PROXY": _long_short_factor(inverse_price_scores, returns),
"HML_PROXY": _long_short_factor(value_proxy_scores, returns),
"RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns),
"CMA_PROXY": _long_short_factor(investment_proxy_scores, returns),
},
index=price_data.index,
)
def prepare_factor_models(
market: str,
extension_factors: pd.DataFrame,
proxy_factors: pd.DataFrame | None = None,
external_factors: pd.DataFrame | None = None,
) -> dict[str, object]:
market_name = market.lower()
if market_name == "us" and external_factors is not None:
factor_frame = pd.concat([external_factors, extension_factors], axis=1)
return {
"factor_frame": factor_frame,
"models": {
"capm": CAPM_FACTOR_COLUMNS.copy(),
"ff5": FF5_FACTOR_COLUMNS.copy(),
"ff5plus": FF5PLUS_FACTOR_COLUMNS.copy(),
},
"risk_free_col": "RF",
"factor_source": "external+local",
"proxy_only": False,
"model_family": "standard",
}
if proxy_factors is None:
raise ValueError("proxy_factors are required when external factors are unavailable")
factor_frame = pd.concat([proxy_factors, extension_factors], axis=1)
return {
"factor_frame": factor_frame,
"models": {"proxy": PROXY_FACTOR_COLUMNS.copy()},
"risk_free_col": None,
"factor_source": "proxy_only",
"proxy_only": True,
"model_family": "proxy",
}
def run_factor_regression(
strategy_returns: pd.Series,
factor_frame: pd.DataFrame,
factor_cols: list[str],
risk_free_col: str | None = None,
) -> dict[str, object]:
regression_frame = pd.concat(
[strategy_returns.rename("strategy"), factor_frame[factor_cols + ([risk_free_col] if risk_free_col else [])]],
axis=1,
).dropna()
if regression_frame.empty:
raise ValueError("No overlapping strategy and factor observations were available for regression")
y = regression_frame["strategy"].astype(float)
if risk_free_col is not None:
y = y - regression_frame[risk_free_col].astype(float)
x = regression_frame[factor_cols].astype(float).to_numpy()
x = np.column_stack([np.ones(len(regression_frame)), x])
n_obs = len(regression_frame)
param_count = x.shape[1]
if n_obs < param_count:
raise ValueError(
f"Insufficient observations for regression: need at least {param_count} rows, got {n_obs}"
)
coefficients, _, rank, _ = np.linalg.lstsq(x, y.to_numpy(), rcond=None)
if rank < param_count:
raise ValueError(
"Regression design matrix is rank-deficient; coefficients are not uniquely identified"
)
fitted = x @ coefficients
residuals = y.to_numpy() - fitted
residual_series = pd.Series(residuals, index=regression_frame.index)
residual_vol_ann = float(residual_series.std(ddof=1) * np.sqrt(TRADING_DAYS_PER_YEAR))
dof = n_obs - param_count
if dof > 0:
residual_variance = float((residuals @ residuals) / dof)
covariance = residual_variance * np.linalg.pinv(x.T @ x)
standard_errors = np.sqrt(np.diag(covariance))
with np.errstate(divide="ignore", invalid="ignore"):
t_stats = np.divide(
coefficients,
standard_errors,
out=np.full_like(coefficients, np.nan, dtype=float),
where=standard_errors > 0,
)
p_values = 2.0 * stats.t.sf(np.abs(t_stats), df=dof)
adj_r_squared_is_defined = True
else:
t_stats = np.full_like(coefficients, np.nan, dtype=float)
p_values = np.full_like(coefficients, np.nan, dtype=float)
adj_r_squared_is_defined = False
ss_total = float(((y - y.mean()) ** 2).sum())
ss_residual = float(np.sum(residuals**2))
r_squared = 1.0 - ss_residual / ss_total if ss_total else 0.0
if adj_r_squared_is_defined:
adj_r_squared = 1.0 - (1.0 - r_squared) * (n_obs - 1) / (n_obs - param_count)
else:
adj_r_squared = float("nan")
factor_slice = slice(1, None)
return {
"alpha_daily": float(coefficients[0]),
"alpha_ann": float(coefficients[0] * TRADING_DAYS_PER_YEAR),
"alpha_t_stat": float(t_stats[0]),
"alpha_p_value": float(p_values[0]),
"betas": {name: float(value) for name, value in zip(factor_cols, coefficients[factor_slice])},
"t_stats": {name: float(value) for name, value in zip(factor_cols, t_stats[factor_slice])},
"p_values": {name: float(value) for name, value in zip(factor_cols, p_values[factor_slice])},
"r_squared": float(r_squared),
"adj_r_squared": float(adj_r_squared),
"residual_vol_ann": residual_vol_ann,
"start_date": regression_frame.index.min().date().isoformat(),
"end_date": regression_frame.index.max().date().isoformat(),
"n_obs": n_obs,
}