Files
quant/factor_attribution.py

726 lines
25 KiB
Python

from __future__ import annotations
import json
import http.client
import io
import re
import socket
import ssl
import warnings
import zipfile
from pathlib import Path
from urllib.error import URLError
from urllib.request import Request, urlopen
import numpy as np
import pandas as pd
from scipy import stats
KEN_FRENCH_DAILY_FF5_ZIP_URL = (
"https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/"
"F-F_Research_Data_5_Factors_2x3_daily_CSV.zip"
)
EXPECTED_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"]
CAPM_FACTOR_COLUMNS = ["MKT_RF"]
FF5_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA"]
EXTENSION_FACTOR_COLUMNS = ["MOM", "LOWVOL", "RECOVERY"]
FF5PLUS_FACTOR_COLUMNS = FF5_FACTOR_COLUMNS + EXTENSION_FACTOR_COLUMNS
PROXY_FACTOR_COLUMNS = [
"MKT",
"SMB_PROXY",
"HML_PROXY",
"RMW_PROXY",
"CMA_PROXY",
] + EXTENSION_FACTOR_COLUMNS
TRADING_DAYS_PER_YEAR = 252
MISSING_BENCHMARK_SENTINEL = "__missing_benchmark__"
SUMMARY_BETA_COLUMN_BY_FACTOR = {
"MKT_RF": "beta_mkt",
"MKT": "beta_mkt",
"SMB": "beta_smb",
"SMB_PROXY": "beta_smb",
"HML": "beta_hml",
"HML_PROXY": "beta_hml",
"RMW": "beta_rmw",
"RMW_PROXY": "beta_rmw",
"CMA": "beta_cma",
"CMA_PROXY": "beta_cma",
"MOM": "beta_mom",
"LOWVOL": "beta_lowvol",
"RECOVERY": "beta_recovery",
}
SUMMARY_COLUMNS = [
"strategy",
"market",
"model",
"factor_source",
"proxy_only",
"beta_semantics",
"start_date",
"end_date",
"n_obs",
"alpha_daily",
"alpha_ann",
"alpha_t_stat",
"alpha_p_value",
"r_squared",
"adj_r_squared",
"residual_vol_ann",
"beta_mkt",
"beta_smb",
"beta_hml",
"beta_rmw",
"beta_cma",
"beta_mom",
"beta_lowvol",
"beta_recovery",
]
LOADING_COLUMNS = [
"strategy",
"market",
"model",
"factor_source",
"proxy_only",
"factor",
"beta",
"t_stat",
"p_value",
]
SEMANTIC_BETA_COLUMNS = [
"beta_mkt",
"beta_smb",
"beta_hml",
"beta_rmw",
"beta_cma",
"beta_mom",
"beta_lowvol",
"beta_recovery",
]
class ExternalFactorFormatError(ValueError):
pass
class ExternalFactorDownloadError(OSError):
pass
def _download_kf_zip_bytes() -> bytes:
request = Request(
KEN_FRENCH_DAILY_FF5_ZIP_URL,
headers={"User-Agent": "quant-factor-attribution/0.1"},
)
try:
with urlopen(request, timeout=30) as response:
return response.read()
except (
URLError,
TimeoutError,
ConnectionError,
socket.timeout,
socket.gaierror,
ssl.SSLError,
http.client.HTTPException,
http.client.IncompleteRead,
http.client.RemoteDisconnected,
) as exc:
raise ExternalFactorDownloadError(f"Failed to download external factor data: {exc}") from exc
def _parse_kf_daily_csv(raw_bytes: bytes) -> pd.DataFrame:
with zipfile.ZipFile(io.BytesIO(raw_bytes)) as archive:
member_names = [
name
for name in archive.namelist()
if not name.endswith("/") and name.lower().endswith((".csv", ".txt"))
]
if not member_names:
raise ExternalFactorFormatError("Ken French archive did not contain a CSV or TXT file")
try:
text = archive.read(member_names[0]).decode("utf-8-sig")
except UnicodeDecodeError as exc:
raise ExternalFactorFormatError("Ken French factor file was not valid UTF-8 text") from exc
lines = [line for line in text.splitlines() if line.strip()]
try:
header_index = next(i for i, line in enumerate(lines) if "Mkt-RF" in line)
except StopIteration as exc:
raise ExternalFactorFormatError("Ken French factor file was missing the daily factor header") from exc
table = "\n".join(lines[header_index:])
try:
factors = pd.read_csv(io.StringIO(table))
except pd.errors.ParserError as exc:
raise ExternalFactorFormatError("Ken French factor table could not be parsed") from exc
factors = factors.rename(columns={"Mkt-RF": "MKT_RF"})
date_column = factors.columns[0]
missing_columns = [column for column in EXPECTED_FACTOR_COLUMNS if column not in factors.columns]
if missing_columns:
raise ExternalFactorFormatError(
f"Ken French factor table was missing columns: {', '.join(missing_columns)}"
)
factors = factors[factors[date_column].astype(str).str.fullmatch(r"\d{8}")]
if factors.empty:
raise ExternalFactorFormatError("Ken French factor table did not contain daily rows")
try:
factors[date_column] = pd.to_datetime(factors[date_column], format="%Y%m%d")
except ValueError as exc:
raise ExternalFactorFormatError("Ken French factor table contained invalid dates") from exc
factors = factors.set_index(date_column)
factors.index.name = None
try:
factors = factors[EXPECTED_FACTOR_COLUMNS].astype(float) / 100.0
except ValueError as exc:
raise ExternalFactorFormatError("Ken French factor table contained non-numeric values") from exc
return factors
def _warn_and_load_cached_factors(cache_path: Path, reason: str) -> pd.DataFrame:
warnings.warn(
f"Using cached data from {cache_path} because {reason}.",
UserWarning,
stacklevel=2,
)
return pd.read_csv(cache_path, index_col=0, parse_dates=True)
def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataFrame:
cache_path = Path(cache_dir) / "ff5_us_daily.csv"
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
raw_bytes = _download_kf_zip_bytes()
except ExternalFactorDownloadError as exc:
if cache_path.exists():
return _warn_and_load_cached_factors(cache_path, f"download failed: {exc}")
raise
try:
factors = _parse_kf_daily_csv(raw_bytes)
except zipfile.BadZipFile as exc:
if cache_path.exists():
return _warn_and_load_cached_factors(cache_path, f"the upstream ZIP was invalid: {exc}")
raise
except ExternalFactorFormatError as exc:
if cache_path.exists():
return _warn_and_load_cached_factors(
cache_path,
f"the upstream factor format was invalid: {exc}",
)
raise
factors.to_csv(cache_path)
return factors
def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame:
stocks = price_data.drop(columns=[benchmark], errors="ignore")
return stocks.sort_index().astype(float)
def _long_short_factor(
scores: pd.DataFrame,
returns: pd.DataFrame,
quantile: float = 0.3,
) -> pd.Series:
lagged_scores = scores.shift(1)
high_cutoff = lagged_scores.quantile(1 - quantile, axis=1)
low_cutoff = lagged_scores.quantile(quantile, axis=1)
long_mask = lagged_scores.ge(high_cutoff, axis=0)
short_mask = lagged_scores.le(low_cutoff, axis=0)
long_returns = returns.where(long_mask).mean(axis=1)
short_returns = returns.where(short_mask).mean(axis=1)
return (long_returns - short_returns).rename(None)
def build_extension_factors(
price_data: pd.DataFrame,
benchmark: str,
market: str,
) -> pd.DataFrame:
del market
stocks = _select_stock_prices(price_data, benchmark)
returns = stocks.pct_change()
momentum_scores = stocks.shift(21).pct_change(231)
low_vol_scores = -returns.rolling(60, min_periods=60).std()
recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0
return pd.DataFrame(
{
"MOM": _long_short_factor(momentum_scores, returns),
"LOWVOL": _long_short_factor(low_vol_scores, returns),
"RECOVERY": _long_short_factor(recovery_scores, returns),
},
index=price_data.index,
)
def _positive_share(values: np.ndarray) -> float:
return float(np.mean(values > 0))
def build_proxy_core_factors(
price_data: pd.DataFrame,
benchmark: str,
market: str,
) -> pd.DataFrame:
del market
stocks = _select_stock_prices(price_data, benchmark)
returns = stocks.pct_change()
if benchmark in price_data:
market_factor = price_data[benchmark].astype(float).pct_change()
else:
market_factor = returns.mean(axis=1)
inverse_price_scores = -stocks
value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0)
profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True)
investment_proxy_scores = -stocks.pct_change(126)
return pd.DataFrame(
{
"MKT": market_factor,
"SMB_PROXY": _long_short_factor(inverse_price_scores, returns),
"HML_PROXY": _long_short_factor(value_proxy_scores, returns),
"RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns),
"CMA_PROXY": _long_short_factor(investment_proxy_scores, returns),
},
index=price_data.index,
)
def prepare_factor_models(
market: str,
extension_factors: pd.DataFrame,
proxy_factors: pd.DataFrame | None = None,
external_factors: pd.DataFrame | None = None,
) -> dict[str, object]:
market_name = market.lower()
if market_name == "us" and external_factors is not None:
factor_frame = pd.concat([external_factors, extension_factors], axis=1)
return {
"factor_frame": factor_frame,
"models": {
"capm": CAPM_FACTOR_COLUMNS.copy(),
"ff5": FF5_FACTOR_COLUMNS.copy(),
"ff5plus": FF5PLUS_FACTOR_COLUMNS.copy(),
},
"risk_free_col": "RF",
"factor_source": "external+local",
"proxy_only": False,
"model_family": "standard",
}
if proxy_factors is None:
raise ValueError("proxy_factors are required when external factors are unavailable")
factor_frame = pd.concat([proxy_factors, extension_factors], axis=1)
return {
"factor_frame": factor_frame,
"models": {"proxy": PROXY_FACTOR_COLUMNS.copy()},
"risk_free_col": None,
"factor_source": "proxy_only",
"proxy_only": True,
"model_family": "proxy",
}
def run_factor_regression(
strategy_returns: pd.Series,
factor_frame: pd.DataFrame,
factor_cols: list[str],
risk_free_col: str | None = None,
) -> dict[str, object]:
regression_frame = pd.concat(
[strategy_returns.rename("strategy"), factor_frame[factor_cols + ([risk_free_col] if risk_free_col else [])]],
axis=1,
).dropna()
if regression_frame.empty:
raise ValueError("No overlapping strategy and factor observations were available for regression")
y = regression_frame["strategy"].astype(float)
if risk_free_col is not None:
y = y - regression_frame[risk_free_col].astype(float)
x = regression_frame[factor_cols].astype(float).to_numpy()
x = np.column_stack([np.ones(len(regression_frame)), x])
n_obs = len(regression_frame)
param_count = x.shape[1]
if n_obs < param_count:
raise ValueError(
f"Insufficient observations for regression: need at least {param_count} rows, got {n_obs}"
)
coefficients, _, rank, _ = np.linalg.lstsq(x, y.to_numpy(), rcond=None)
if rank < param_count:
raise ValueError(
"Regression design matrix is rank-deficient; coefficients are not uniquely identified"
)
fitted = x @ coefficients
residuals = y.to_numpy() - fitted
residual_series = pd.Series(residuals, index=regression_frame.index)
if len(residual_series) == 1:
residual_vol_ann = 0.0
else:
residual_vol_ann = float(residual_series.std(ddof=1) * np.sqrt(TRADING_DAYS_PER_YEAR))
dof = n_obs - param_count
if dof > 0:
residual_variance = float((residuals @ residuals) / dof)
covariance = residual_variance * np.linalg.pinv(x.T @ x)
standard_errors = np.sqrt(np.diag(covariance))
with np.errstate(divide="ignore", invalid="ignore"):
t_stats = np.divide(
coefficients,
standard_errors,
out=np.full_like(coefficients, np.nan, dtype=float),
where=standard_errors > 0,
)
p_values = 2.0 * stats.t.sf(np.abs(t_stats), df=dof)
adj_r_squared_is_defined = True
else:
t_stats = np.full_like(coefficients, np.nan, dtype=float)
p_values = np.full_like(coefficients, np.nan, dtype=float)
adj_r_squared_is_defined = False
ss_total = float(((y - y.mean()) ** 2).sum())
ss_residual = float(np.sum(residuals**2))
r_squared = 1.0 - ss_residual / ss_total if ss_total else 0.0
if adj_r_squared_is_defined:
adj_r_squared = 1.0 - (1.0 - r_squared) * (n_obs - 1) / (n_obs - param_count)
else:
adj_r_squared = float("nan")
factor_slice = slice(1, None)
return {
"alpha_daily": float(coefficients[0]),
"alpha_ann": float(coefficients[0] * TRADING_DAYS_PER_YEAR),
"alpha_t_stat": float(t_stats[0]),
"alpha_p_value": float(p_values[0]),
"betas": {name: float(value) for name, value in zip(factor_cols, coefficients[factor_slice])},
"t_stats": {name: float(value) for name, value in zip(factor_cols, t_stats[factor_slice])},
"p_values": {name: float(value) for name, value in zip(factor_cols, p_values[factor_slice])},
"r_squared": float(r_squared),
"adj_r_squared": float(adj_r_squared),
"residual_vol_ann": residual_vol_ann,
"start_date": regression_frame.index.min().date().isoformat(),
"end_date": regression_frame.index.max().date().isoformat(),
"n_obs": n_obs,
}
def _empty_attribution_frames() -> tuple[pd.DataFrame, pd.DataFrame]:
return (
pd.DataFrame(columns=SUMMARY_COLUMNS),
pd.DataFrame(columns=LOADING_COLUMNS),
)
def _select_model_names(
model_selection: str,
available_models: dict[str, list[str]],
) -> list[str]:
if model_selection == "all":
return list(available_models)
if model_selection in available_models:
return [model_selection]
return list(available_models)
def _resolve_benchmark_symbol(benchmark: str | None) -> str:
if benchmark is None:
return MISSING_BENCHMARK_SENTINEL
return benchmark
def _beta_semantics_map(proxy_only: bool) -> dict[str, str]:
return {
"beta_mkt": "MKT" if proxy_only else "MKT_RF",
"beta_smb": "SMB_PROXY" if proxy_only else "SMB",
"beta_hml": "HML_PROXY" if proxy_only else "HML",
"beta_rmw": "RMW_PROXY" if proxy_only else "RMW",
"beta_cma": "CMA_PROXY" if proxy_only else "CMA",
"beta_mom": "MOM",
"beta_lowvol": "LOWVOL",
"beta_recovery": "RECOVERY",
}
def _resolve_beta_semantics(row: pd.Series) -> dict[str, str]:
canonical = _beta_semantics_map(bool(row.get("proxy_only", False)))
raw_value = row.get("beta_semantics")
if isinstance(raw_value, str) and raw_value:
try:
parsed = json.loads(raw_value)
except json.JSONDecodeError:
return canonical
else:
if isinstance(parsed, dict):
parsed_mapping = {str(key): str(value) for key, value in parsed.items()}
if set(parsed_mapping) == set(SEMANTIC_BETA_COLUMNS) and all(
value.strip() for value in parsed_mapping.values()
) and _semantics_have_unique_headers(parsed_mapping):
return parsed_mapping
return canonical
def _beta_header_name(factor_name: str) -> str:
suffix = factor_name.strip().lower()
suffix = re.sub(r"[^a-z0-9]+", "_", suffix).strip("_")
if suffix == "mkt_rf":
suffix = "mkt"
return f"beta_{suffix}"
def _semantics_have_unique_headers(semantics: dict[str, str]) -> bool:
headers = [_beta_header_name(semantics[column]) for column in SEMANTIC_BETA_COLUMNS]
return len(headers) == len(set(headers))
def _section_beta_header_map(semantics: dict[str, str]) -> dict[str, str]:
header_map: dict[str, str] = {}
for beta_column, factor_name in semantics.items():
header_map[beta_column] = _beta_header_name(factor_name)
return header_map
def _section_key(row: pd.Series) -> tuple[bool, tuple[tuple[str, str], ...]]:
semantics = _resolve_beta_semantics(row)
return bool(row.get("proxy_only", False)), tuple((key, semantics[key]) for key in SEMANTIC_BETA_COLUMNS)
def attribute_strategies(
results_df: pd.DataFrame,
benchmark_label: str,
price_data: pd.DataFrame,
market: str,
model_selection: str = "all",
benchmark: str | None = None,
external_factors: pd.DataFrame | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
benchmark_symbol = _resolve_benchmark_symbol(benchmark)
extension_factors = build_extension_factors(price_data, benchmark=benchmark_symbol, market=market)
resolved_external_factors = external_factors
market_name = market.lower()
if market_name == "us" and resolved_external_factors is None:
try:
resolved_external_factors = load_external_us_factors()
except (ExternalFactorDownloadError, ExternalFactorFormatError, zipfile.BadZipFile) as exc:
warnings.warn(
f"Falling back to proxy factor attribution because external US factors were unavailable: {exc}",
UserWarning,
stacklevel=2,
)
resolved_external_factors = None
proxy_factors = None
if market_name != "us" or resolved_external_factors is None:
proxy_factors = build_proxy_core_factors(price_data, benchmark=benchmark_symbol, market=market)
prepared = prepare_factor_models(
market=market,
extension_factors=extension_factors,
proxy_factors=proxy_factors,
external_factors=resolved_external_factors,
)
model_names = _select_model_names(model_selection, prepared["models"])
strategy_returns = results_df.sort_index().pct_change(fill_method=None)
if strategy_returns.empty:
return _empty_attribution_frames()
summary_rows: list[dict[str, object]] = []
loading_rows: list[dict[str, object]] = []
for strategy_name in strategy_returns.columns:
if strategy_name == benchmark_label:
continue
for model_name in model_names:
factor_cols = prepared["models"][model_name]
try:
regression_result = run_factor_regression(
strategy_returns=strategy_returns[strategy_name],
factor_frame=prepared["factor_frame"],
factor_cols=factor_cols,
risk_free_col=prepared["risk_free_col"],
)
except ValueError as exc:
warnings.warn(
f"Skipping factor attribution for {strategy_name} ({model_name}): {exc}",
UserWarning,
stacklevel=2,
)
continue
summary_row: dict[str, object] = {
"strategy": strategy_name,
"market": market_name,
"model": model_name,
"factor_source": prepared["factor_source"],
"proxy_only": prepared["proxy_only"],
"beta_semantics": json.dumps(_beta_semantics_map(bool(prepared["proxy_only"])), sort_keys=True),
"start_date": regression_result["start_date"],
"end_date": regression_result["end_date"],
"n_obs": regression_result["n_obs"],
"alpha_daily": regression_result["alpha_daily"],
"alpha_ann": regression_result["alpha_ann"],
"alpha_t_stat": regression_result["alpha_t_stat"],
"alpha_p_value": regression_result["alpha_p_value"],
"r_squared": regression_result["r_squared"],
"adj_r_squared": regression_result["adj_r_squared"],
"residual_vol_ann": regression_result["residual_vol_ann"],
"beta_mkt": np.nan,
"beta_smb": np.nan,
"beta_hml": np.nan,
"beta_rmw": np.nan,
"beta_cma": np.nan,
"beta_mom": np.nan,
"beta_lowvol": np.nan,
"beta_recovery": np.nan,
}
for factor_name, beta in regression_result["betas"].items():
summary_column = SUMMARY_BETA_COLUMN_BY_FACTOR.get(factor_name)
if summary_column is not None:
summary_row[summary_column] = beta
loading_rows.append(
{
"strategy": strategy_name,
"market": market_name,
"model": model_name,
"factor_source": prepared["factor_source"],
"proxy_only": prepared["proxy_only"],
"factor": factor_name,
"beta": beta,
"t_stat": regression_result["t_stats"][factor_name],
"p_value": regression_result["p_values"][factor_name],
}
)
summary_rows.append(summary_row)
summary_df = pd.DataFrame(summary_rows, columns=SUMMARY_COLUMNS)
loadings_df = pd.DataFrame(loading_rows, columns=LOADING_COLUMNS)
return summary_df, loadings_df
def export_attribution(
summary_df: pd.DataFrame,
loadings_df: pd.DataFrame,
output_dir: Path | str,
) -> None:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
summary_df.to_csv(output_path / "summary.csv", index=False)
loadings_df.to_csv(output_path / "loadings.csv", index=False)
def _describe_alpha(alpha_ann: float) -> str:
if alpha_ann > 0.02:
return "positive"
if alpha_ann < -0.02:
return "negative"
return "close to flat"
def _describe_fit(r_squared: float) -> str:
if r_squared >= 0.75:
return "strong"
if r_squared >= 0.4:
return "moderate"
return "weak"
def _top_loading_descriptions(row: pd.Series, limit: int = 2) -> str:
beta_columns = [column for column in row.index if column.startswith("beta_")]
factor_labels = _resolve_beta_semantics(row)
present = []
for column in beta_columns:
value = row.get(column)
label = factor_labels.get(column)
if label is not None and pd.notna(value):
present.append((label, float(value)))
if not present:
return "no material factor loadings were estimated"
top_loadings = sorted(present, key=lambda item: abs(item[1]), reverse=True)[:limit]
return ", ".join(f"{name} {value:.2f}" for name, value in top_loadings)
def _print_attribution_section(summary_df: pd.DataFrame, title: str, semantics: dict[str, str]) -> None:
display_columns = [
"strategy",
"market",
"model",
"factor_source",
"proxy_only",
"alpha_ann",
"r_squared",
"residual_vol_ann",
"beta_mkt",
"beta_smb",
"beta_hml",
"beta_rmw",
"beta_cma",
"beta_mom",
"beta_lowvol",
"beta_recovery",
]
table = summary_df.reindex(columns=display_columns).copy()
table = table.rename(columns=_section_beta_header_map(semantics))
numeric_columns = [
column
for column in table.columns
if column not in {"strategy", "market", "model", "factor_source", "proxy_only"}
]
table.loc[:, numeric_columns] = table.loc[:, numeric_columns].round(4)
print(f"\n{title}")
print(table.to_string(index=False, na_rep=""))
def print_attribution_summary(summary_df: pd.DataFrame) -> None:
if summary_df.empty:
print("Factor attribution: no usable regressions were produced.")
return
print("\nFactor attribution")
sections: dict[tuple[bool, tuple[tuple[str, str], ...]], list[int]] = {}
for index, row in summary_df.iterrows():
sections.setdefault(_section_key(row), []).append(index)
for (is_proxy, semantics_items), row_indexes in sections.items():
section_rows = summary_df.loc[row_indexes]
title = "Proxy factor attribution" if is_proxy else "Standard factor attribution"
_print_attribution_section(
section_rows,
title=title,
semantics=dict(semantics_items),
)
print("\nInterpretation")
for _, row in summary_df.iterrows():
print(
f"- {row['strategy']} / {row['model']}: estimated annualized alpha is "
f"{_describe_alpha(float(row['alpha_ann']))} ({row['alpha_ann']:.2%}); "
f"strongest loadings are {_top_loading_descriptions(row)}; "
f"model fit looks {_describe_fit(float(row['r_squared']))} (R^2={row['r_squared']:.2f})."
)