Add factor attribution regression engine

This commit is contained in:
2026-04-07 16:40:24 +08:00
parent 507565c556
commit f2e14ec200
2 changed files with 196 additions and 0 deletions

View File

@@ -12,6 +12,7 @@ from urllib.request import Request, urlopen
import numpy as np
import pandas as pd
from scipy import stats
KEN_FRENCH_DAILY_FF5_ZIP_URL = (
"https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/"
@@ -19,6 +20,18 @@ KEN_FRENCH_DAILY_FF5_ZIP_URL = (
)
EXPECTED_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"]
CAPM_FACTOR_COLUMNS = ["MKT_RF"]
FF5_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA"]
EXTENSION_FACTOR_COLUMNS = ["MOM", "LOWVOL", "RECOVERY"]
FF5PLUS_FACTOR_COLUMNS = FF5_FACTOR_COLUMNS + EXTENSION_FACTOR_COLUMNS
PROXY_FACTOR_COLUMNS = [
"MKT",
"SMB_PROXY",
"HML_PROXY",
"RMW_PROXY",
"CMA_PROXY",
] + EXTENSION_FACTOR_COLUMNS
TRADING_DAYS_PER_YEAR = 252
class ExternalFactorFormatError(ValueError):
@@ -222,3 +235,107 @@ def build_proxy_core_factors(
},
index=price_data.index,
)
def prepare_factor_models(
market: str,
extension_factors: pd.DataFrame,
proxy_factors: pd.DataFrame | None = None,
external_factors: pd.DataFrame | None = None,
) -> dict[str, object]:
market_name = market.lower()
if market_name == "us" and external_factors is not None:
factor_frame = pd.concat([external_factors, extension_factors], axis=1)
return {
"factor_frame": factor_frame,
"models": {
"capm": CAPM_FACTOR_COLUMNS.copy(),
"ff5": FF5_FACTOR_COLUMNS.copy(),
"ff5plus": FF5PLUS_FACTOR_COLUMNS.copy(),
},
"risk_free_col": "RF",
"factor_source": "external+local",
"proxy_only": False,
"model_family": "standard",
}
if proxy_factors is None:
raise ValueError("proxy_factors are required when external factors are unavailable")
factor_frame = pd.concat([proxy_factors, extension_factors], axis=1)
return {
"factor_frame": factor_frame,
"models": {"proxy": PROXY_FACTOR_COLUMNS.copy()},
"risk_free_col": None,
"factor_source": "proxy_only",
"proxy_only": True,
"model_family": "proxy",
}
def run_factor_regression(
strategy_returns: pd.Series,
factor_frame: pd.DataFrame,
factor_cols: list[str],
risk_free_col: str | None = None,
) -> dict[str, object]:
regression_frame = pd.concat(
[strategy_returns.rename("strategy"), factor_frame[factor_cols + ([risk_free_col] if risk_free_col else [])]],
axis=1,
).dropna()
if regression_frame.empty:
raise ValueError("No overlapping strategy and factor observations were available for regression")
y = regression_frame["strategy"].astype(float)
if risk_free_col is not None:
y = y - regression_frame[risk_free_col].astype(float)
x = regression_frame[factor_cols].astype(float).to_numpy()
x = np.column_stack([np.ones(len(regression_frame)), x])
coefficients, _, _, _ = np.linalg.lstsq(x, y.to_numpy(), rcond=None)
fitted = x @ coefficients
residuals = y.to_numpy() - fitted
n_obs = len(regression_frame)
param_count = x.shape[1]
dof = max(n_obs - param_count, 1)
residual_variance = float((residuals @ residuals) / dof)
covariance = residual_variance * np.linalg.pinv(x.T @ x)
standard_errors = np.sqrt(np.diag(covariance))
with np.errstate(divide="ignore", invalid="ignore"):
t_stats = np.divide(
coefficients,
standard_errors,
out=np.full_like(coefficients, np.nan, dtype=float),
where=standard_errors > 0,
)
p_values = 2.0 * stats.t.sf(np.abs(t_stats), df=dof)
ss_total = float(((y - y.mean()) ** 2).sum())
ss_residual = float(np.sum(residuals**2))
r_squared = 1.0 - ss_residual / ss_total if ss_total else 0.0
if n_obs > param_count:
adj_r_squared = 1.0 - (1.0 - r_squared) * (n_obs - 1) / (n_obs - param_count)
else:
adj_r_squared = 0.0
factor_slice = slice(1, None)
residual_series = pd.Series(residuals, index=regression_frame.index)
return {
"alpha_daily": float(coefficients[0]),
"alpha_ann": float(coefficients[0] * TRADING_DAYS_PER_YEAR),
"alpha_t_stat": float(t_stats[0]),
"alpha_p_value": float(p_values[0]),
"betas": {name: float(value) for name, value in zip(factor_cols, coefficients[factor_slice])},
"t_stats": {name: float(value) for name, value in zip(factor_cols, t_stats[factor_slice])},
"p_values": {name: float(value) for name, value in zip(factor_cols, p_values[factor_slice])},
"r_squared": float(r_squared),
"adj_r_squared": float(adj_r_squared),
"residual_vol_ann": float(residual_series.std(ddof=1) * np.sqrt(TRADING_DAYS_PER_YEAR)),
"start_date": regression_frame.index.min().date().isoformat(),
"end_date": regression_frame.index.max().date().isoformat(),
"n_obs": n_obs,
}