643 lines
22 KiB
Python
643 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
import http.client
|
|
import io
|
|
import socket
|
|
import ssl
|
|
import warnings
|
|
import zipfile
|
|
from pathlib import Path
|
|
from urllib.error import URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy import stats
|
|
|
|
KEN_FRENCH_DAILY_FF5_ZIP_URL = (
|
|
"https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/"
|
|
"F-F_Research_Data_5_Factors_2x3_daily_CSV.zip"
|
|
)
|
|
|
|
EXPECTED_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"]
|
|
CAPM_FACTOR_COLUMNS = ["MKT_RF"]
|
|
FF5_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA"]
|
|
EXTENSION_FACTOR_COLUMNS = ["MOM", "LOWVOL", "RECOVERY"]
|
|
FF5PLUS_FACTOR_COLUMNS = FF5_FACTOR_COLUMNS + EXTENSION_FACTOR_COLUMNS
|
|
PROXY_FACTOR_COLUMNS = [
|
|
"MKT",
|
|
"SMB_PROXY",
|
|
"HML_PROXY",
|
|
"RMW_PROXY",
|
|
"CMA_PROXY",
|
|
] + EXTENSION_FACTOR_COLUMNS
|
|
TRADING_DAYS_PER_YEAR = 252
|
|
MISSING_BENCHMARK_SENTINEL = "__missing_benchmark__"
|
|
SUMMARY_BETA_COLUMN_BY_FACTOR = {
|
|
"MKT_RF": "beta_mkt",
|
|
"MKT": "beta_mkt",
|
|
"SMB": "beta_smb",
|
|
"SMB_PROXY": "beta_smb_proxy",
|
|
"HML": "beta_hml",
|
|
"HML_PROXY": "beta_hml_proxy",
|
|
"RMW": "beta_rmw",
|
|
"RMW_PROXY": "beta_rmw_proxy",
|
|
"CMA": "beta_cma",
|
|
"CMA_PROXY": "beta_cma_proxy",
|
|
"MOM": "beta_mom",
|
|
"LOWVOL": "beta_lowvol",
|
|
"RECOVERY": "beta_recovery",
|
|
}
|
|
SUMMARY_COLUMNS = [
|
|
"strategy",
|
|
"market",
|
|
"model",
|
|
"factor_source",
|
|
"proxy_only",
|
|
"start_date",
|
|
"end_date",
|
|
"n_obs",
|
|
"alpha_daily",
|
|
"alpha_ann",
|
|
"alpha_t_stat",
|
|
"alpha_p_value",
|
|
"r_squared",
|
|
"adj_r_squared",
|
|
"residual_vol_ann",
|
|
"beta_mkt",
|
|
"beta_smb",
|
|
"beta_hml",
|
|
"beta_rmw",
|
|
"beta_cma",
|
|
"beta_smb_proxy",
|
|
"beta_hml_proxy",
|
|
"beta_rmw_proxy",
|
|
"beta_cma_proxy",
|
|
"beta_mom",
|
|
"beta_lowvol",
|
|
"beta_recovery",
|
|
]
|
|
LOADING_COLUMNS = [
|
|
"strategy",
|
|
"market",
|
|
"model",
|
|
"factor_source",
|
|
"proxy_only",
|
|
"factor",
|
|
"beta",
|
|
"t_stat",
|
|
"p_value",
|
|
]
|
|
|
|
|
|
class ExternalFactorFormatError(ValueError):
|
|
pass
|
|
|
|
|
|
class ExternalFactorDownloadError(OSError):
|
|
pass
|
|
|
|
|
|
def _download_kf_zip_bytes() -> bytes:
|
|
request = Request(
|
|
KEN_FRENCH_DAILY_FF5_ZIP_URL,
|
|
headers={"User-Agent": "quant-factor-attribution/0.1"},
|
|
)
|
|
try:
|
|
with urlopen(request, timeout=30) as response:
|
|
return response.read()
|
|
except (
|
|
URLError,
|
|
TimeoutError,
|
|
ConnectionError,
|
|
socket.timeout,
|
|
socket.gaierror,
|
|
ssl.SSLError,
|
|
http.client.HTTPException,
|
|
http.client.IncompleteRead,
|
|
http.client.RemoteDisconnected,
|
|
) as exc:
|
|
raise ExternalFactorDownloadError(f"Failed to download external factor data: {exc}") from exc
|
|
|
|
|
|
def _parse_kf_daily_csv(raw_bytes: bytes) -> pd.DataFrame:
|
|
with zipfile.ZipFile(io.BytesIO(raw_bytes)) as archive:
|
|
member_names = [
|
|
name
|
|
for name in archive.namelist()
|
|
if not name.endswith("/") and name.lower().endswith((".csv", ".txt"))
|
|
]
|
|
if not member_names:
|
|
raise ExternalFactorFormatError("Ken French archive did not contain a CSV or TXT file")
|
|
|
|
try:
|
|
text = archive.read(member_names[0]).decode("utf-8-sig")
|
|
except UnicodeDecodeError as exc:
|
|
raise ExternalFactorFormatError("Ken French factor file was not valid UTF-8 text") from exc
|
|
|
|
lines = [line for line in text.splitlines() if line.strip()]
|
|
try:
|
|
header_index = next(i for i, line in enumerate(lines) if "Mkt-RF" in line)
|
|
except StopIteration as exc:
|
|
raise ExternalFactorFormatError("Ken French factor file was missing the daily factor header") from exc
|
|
|
|
table = "\n".join(lines[header_index:])
|
|
try:
|
|
factors = pd.read_csv(io.StringIO(table))
|
|
except pd.errors.ParserError as exc:
|
|
raise ExternalFactorFormatError("Ken French factor table could not be parsed") from exc
|
|
|
|
factors = factors.rename(columns={"Mkt-RF": "MKT_RF"})
|
|
date_column = factors.columns[0]
|
|
missing_columns = [column for column in EXPECTED_FACTOR_COLUMNS if column not in factors.columns]
|
|
if missing_columns:
|
|
raise ExternalFactorFormatError(
|
|
f"Ken French factor table was missing columns: {', '.join(missing_columns)}"
|
|
)
|
|
|
|
factors = factors[factors[date_column].astype(str).str.fullmatch(r"\d{8}")]
|
|
if factors.empty:
|
|
raise ExternalFactorFormatError("Ken French factor table did not contain daily rows")
|
|
|
|
try:
|
|
factors[date_column] = pd.to_datetime(factors[date_column], format="%Y%m%d")
|
|
except ValueError as exc:
|
|
raise ExternalFactorFormatError("Ken French factor table contained invalid dates") from exc
|
|
|
|
factors = factors.set_index(date_column)
|
|
factors.index.name = None
|
|
try:
|
|
factors = factors[EXPECTED_FACTOR_COLUMNS].astype(float) / 100.0
|
|
except ValueError as exc:
|
|
raise ExternalFactorFormatError("Ken French factor table contained non-numeric values") from exc
|
|
|
|
return factors
|
|
|
|
|
|
def _warn_and_load_cached_factors(cache_path: Path, reason: str) -> pd.DataFrame:
|
|
warnings.warn(
|
|
f"Using cached data from {cache_path} because {reason}.",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
return pd.read_csv(cache_path, index_col=0, parse_dates=True)
|
|
|
|
|
|
def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataFrame:
|
|
cache_path = Path(cache_dir) / "ff5_us_daily.csv"
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
raw_bytes = _download_kf_zip_bytes()
|
|
except ExternalFactorDownloadError as exc:
|
|
if cache_path.exists():
|
|
return _warn_and_load_cached_factors(cache_path, f"download failed: {exc}")
|
|
raise
|
|
|
|
try:
|
|
factors = _parse_kf_daily_csv(raw_bytes)
|
|
except zipfile.BadZipFile as exc:
|
|
if cache_path.exists():
|
|
return _warn_and_load_cached_factors(cache_path, f"the upstream ZIP was invalid: {exc}")
|
|
raise
|
|
except ExternalFactorFormatError as exc:
|
|
if cache_path.exists():
|
|
return _warn_and_load_cached_factors(
|
|
cache_path,
|
|
f"the upstream factor format was invalid: {exc}",
|
|
)
|
|
raise
|
|
|
|
factors.to_csv(cache_path)
|
|
return factors
|
|
|
|
|
|
def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame:
|
|
stocks = price_data.drop(columns=[benchmark], errors="ignore")
|
|
return stocks.sort_index().astype(float)
|
|
|
|
|
|
def _long_short_factor(
|
|
scores: pd.DataFrame,
|
|
returns: pd.DataFrame,
|
|
quantile: float = 0.3,
|
|
) -> pd.Series:
|
|
lagged_scores = scores.shift(1)
|
|
high_cutoff = lagged_scores.quantile(1 - quantile, axis=1)
|
|
low_cutoff = lagged_scores.quantile(quantile, axis=1)
|
|
|
|
long_mask = lagged_scores.ge(high_cutoff, axis=0)
|
|
short_mask = lagged_scores.le(low_cutoff, axis=0)
|
|
long_returns = returns.where(long_mask).mean(axis=1)
|
|
short_returns = returns.where(short_mask).mean(axis=1)
|
|
return (long_returns - short_returns).rename(None)
|
|
|
|
|
|
def build_extension_factors(
|
|
price_data: pd.DataFrame,
|
|
benchmark: str,
|
|
market: str,
|
|
) -> pd.DataFrame:
|
|
del market
|
|
|
|
stocks = _select_stock_prices(price_data, benchmark)
|
|
returns = stocks.pct_change()
|
|
|
|
momentum_scores = stocks.shift(21).pct_change(231)
|
|
low_vol_scores = -returns.rolling(60, min_periods=60).std()
|
|
recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0
|
|
|
|
return pd.DataFrame(
|
|
{
|
|
"MOM": _long_short_factor(momentum_scores, returns),
|
|
"LOWVOL": _long_short_factor(low_vol_scores, returns),
|
|
"RECOVERY": _long_short_factor(recovery_scores, returns),
|
|
},
|
|
index=price_data.index,
|
|
)
|
|
|
|
|
|
def _positive_share(values: np.ndarray) -> float:
|
|
return float(np.mean(values > 0))
|
|
|
|
|
|
def build_proxy_core_factors(
|
|
price_data: pd.DataFrame,
|
|
benchmark: str,
|
|
market: str,
|
|
) -> pd.DataFrame:
|
|
del market
|
|
|
|
stocks = _select_stock_prices(price_data, benchmark)
|
|
returns = stocks.pct_change()
|
|
|
|
if benchmark in price_data:
|
|
market_factor = price_data[benchmark].astype(float).pct_change()
|
|
else:
|
|
market_factor = returns.mean(axis=1)
|
|
|
|
inverse_price_scores = -stocks
|
|
value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0)
|
|
profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True)
|
|
investment_proxy_scores = -stocks.pct_change(126)
|
|
|
|
return pd.DataFrame(
|
|
{
|
|
"MKT": market_factor,
|
|
"SMB_PROXY": _long_short_factor(inverse_price_scores, returns),
|
|
"HML_PROXY": _long_short_factor(value_proxy_scores, returns),
|
|
"RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns),
|
|
"CMA_PROXY": _long_short_factor(investment_proxy_scores, returns),
|
|
},
|
|
index=price_data.index,
|
|
)
|
|
|
|
|
|
def prepare_factor_models(
|
|
market: str,
|
|
extension_factors: pd.DataFrame,
|
|
proxy_factors: pd.DataFrame | None = None,
|
|
external_factors: pd.DataFrame | None = None,
|
|
) -> dict[str, object]:
|
|
market_name = market.lower()
|
|
if market_name == "us" and external_factors is not None:
|
|
factor_frame = pd.concat([external_factors, extension_factors], axis=1)
|
|
return {
|
|
"factor_frame": factor_frame,
|
|
"models": {
|
|
"capm": CAPM_FACTOR_COLUMNS.copy(),
|
|
"ff5": FF5_FACTOR_COLUMNS.copy(),
|
|
"ff5plus": FF5PLUS_FACTOR_COLUMNS.copy(),
|
|
},
|
|
"risk_free_col": "RF",
|
|
"factor_source": "external+local",
|
|
"proxy_only": False,
|
|
"model_family": "standard",
|
|
}
|
|
|
|
if proxy_factors is None:
|
|
raise ValueError("proxy_factors are required when external factors are unavailable")
|
|
|
|
factor_frame = pd.concat([proxy_factors, extension_factors], axis=1)
|
|
return {
|
|
"factor_frame": factor_frame,
|
|
"models": {"proxy": PROXY_FACTOR_COLUMNS.copy()},
|
|
"risk_free_col": None,
|
|
"factor_source": "proxy_only",
|
|
"proxy_only": True,
|
|
"model_family": "proxy",
|
|
}
|
|
|
|
|
|
def run_factor_regression(
|
|
strategy_returns: pd.Series,
|
|
factor_frame: pd.DataFrame,
|
|
factor_cols: list[str],
|
|
risk_free_col: str | None = None,
|
|
) -> dict[str, object]:
|
|
regression_frame = pd.concat(
|
|
[strategy_returns.rename("strategy"), factor_frame[factor_cols + ([risk_free_col] if risk_free_col else [])]],
|
|
axis=1,
|
|
).dropna()
|
|
|
|
if regression_frame.empty:
|
|
raise ValueError("No overlapping strategy and factor observations were available for regression")
|
|
|
|
y = regression_frame["strategy"].astype(float)
|
|
if risk_free_col is not None:
|
|
y = y - regression_frame[risk_free_col].astype(float)
|
|
|
|
x = regression_frame[factor_cols].astype(float).to_numpy()
|
|
x = np.column_stack([np.ones(len(regression_frame)), x])
|
|
n_obs = len(regression_frame)
|
|
param_count = x.shape[1]
|
|
if n_obs < param_count:
|
|
raise ValueError(
|
|
f"Insufficient observations for regression: need at least {param_count} rows, got {n_obs}"
|
|
)
|
|
|
|
coefficients, _, rank, _ = np.linalg.lstsq(x, y.to_numpy(), rcond=None)
|
|
if rank < param_count:
|
|
raise ValueError(
|
|
"Regression design matrix is rank-deficient; coefficients are not uniquely identified"
|
|
)
|
|
|
|
fitted = x @ coefficients
|
|
residuals = y.to_numpy() - fitted
|
|
residual_series = pd.Series(residuals, index=regression_frame.index)
|
|
if len(residual_series) == 1:
|
|
residual_vol_ann = 0.0
|
|
else:
|
|
residual_vol_ann = float(residual_series.std(ddof=1) * np.sqrt(TRADING_DAYS_PER_YEAR))
|
|
|
|
dof = n_obs - param_count
|
|
if dof > 0:
|
|
residual_variance = float((residuals @ residuals) / dof)
|
|
covariance = residual_variance * np.linalg.pinv(x.T @ x)
|
|
standard_errors = np.sqrt(np.diag(covariance))
|
|
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
t_stats = np.divide(
|
|
coefficients,
|
|
standard_errors,
|
|
out=np.full_like(coefficients, np.nan, dtype=float),
|
|
where=standard_errors > 0,
|
|
)
|
|
p_values = 2.0 * stats.t.sf(np.abs(t_stats), df=dof)
|
|
adj_r_squared_is_defined = True
|
|
else:
|
|
t_stats = np.full_like(coefficients, np.nan, dtype=float)
|
|
p_values = np.full_like(coefficients, np.nan, dtype=float)
|
|
adj_r_squared_is_defined = False
|
|
|
|
ss_total = float(((y - y.mean()) ** 2).sum())
|
|
ss_residual = float(np.sum(residuals**2))
|
|
r_squared = 1.0 - ss_residual / ss_total if ss_total else 0.0
|
|
if adj_r_squared_is_defined:
|
|
adj_r_squared = 1.0 - (1.0 - r_squared) * (n_obs - 1) / (n_obs - param_count)
|
|
else:
|
|
adj_r_squared = float("nan")
|
|
|
|
factor_slice = slice(1, None)
|
|
return {
|
|
"alpha_daily": float(coefficients[0]),
|
|
"alpha_ann": float(coefficients[0] * TRADING_DAYS_PER_YEAR),
|
|
"alpha_t_stat": float(t_stats[0]),
|
|
"alpha_p_value": float(p_values[0]),
|
|
"betas": {name: float(value) for name, value in zip(factor_cols, coefficients[factor_slice])},
|
|
"t_stats": {name: float(value) for name, value in zip(factor_cols, t_stats[factor_slice])},
|
|
"p_values": {name: float(value) for name, value in zip(factor_cols, p_values[factor_slice])},
|
|
"r_squared": float(r_squared),
|
|
"adj_r_squared": float(adj_r_squared),
|
|
"residual_vol_ann": residual_vol_ann,
|
|
"start_date": regression_frame.index.min().date().isoformat(),
|
|
"end_date": regression_frame.index.max().date().isoformat(),
|
|
"n_obs": n_obs,
|
|
}
|
|
|
|
|
|
def _empty_attribution_frames() -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
return (
|
|
pd.DataFrame(columns=SUMMARY_COLUMNS),
|
|
pd.DataFrame(columns=LOADING_COLUMNS),
|
|
)
|
|
|
|
|
|
def _select_model_names(
|
|
model_selection: str,
|
|
available_models: dict[str, list[str]],
|
|
) -> list[str]:
|
|
if model_selection == "all":
|
|
return list(available_models)
|
|
if model_selection in available_models:
|
|
return [model_selection]
|
|
return list(available_models)
|
|
|
|
|
|
def _resolve_benchmark_symbol(benchmark: str | None) -> str:
|
|
if benchmark is None:
|
|
return MISSING_BENCHMARK_SENTINEL
|
|
return benchmark
|
|
|
|
|
|
def attribute_strategies(
|
|
results_df: pd.DataFrame,
|
|
benchmark_label: str,
|
|
price_data: pd.DataFrame,
|
|
market: str,
|
|
model_selection: str = "all",
|
|
benchmark: str | None = None,
|
|
external_factors: pd.DataFrame | None = None,
|
|
) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
benchmark_symbol = _resolve_benchmark_symbol(benchmark)
|
|
|
|
extension_factors = build_extension_factors(price_data, benchmark=benchmark_symbol, market=market)
|
|
|
|
resolved_external_factors = external_factors
|
|
market_name = market.lower()
|
|
if market_name == "us" and resolved_external_factors is None:
|
|
try:
|
|
resolved_external_factors = load_external_us_factors()
|
|
except (ExternalFactorDownloadError, ExternalFactorFormatError, zipfile.BadZipFile) as exc:
|
|
warnings.warn(
|
|
f"Falling back to proxy factor attribution because external US factors were unavailable: {exc}",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
resolved_external_factors = None
|
|
|
|
proxy_factors = None
|
|
if market_name != "us" or resolved_external_factors is None:
|
|
proxy_factors = build_proxy_core_factors(price_data, benchmark=benchmark_symbol, market=market)
|
|
|
|
prepared = prepare_factor_models(
|
|
market=market,
|
|
extension_factors=extension_factors,
|
|
proxy_factors=proxy_factors,
|
|
external_factors=resolved_external_factors,
|
|
)
|
|
model_names = _select_model_names(model_selection, prepared["models"])
|
|
|
|
strategy_returns = results_df.sort_index().pct_change(fill_method=None)
|
|
if strategy_returns.empty:
|
|
return _empty_attribution_frames()
|
|
|
|
summary_rows: list[dict[str, object]] = []
|
|
loading_rows: list[dict[str, object]] = []
|
|
for strategy_name in strategy_returns.columns:
|
|
if strategy_name == benchmark_label:
|
|
continue
|
|
|
|
for model_name in model_names:
|
|
factor_cols = prepared["models"][model_name]
|
|
try:
|
|
regression_result = run_factor_regression(
|
|
strategy_returns=strategy_returns[strategy_name],
|
|
factor_frame=prepared["factor_frame"],
|
|
factor_cols=factor_cols,
|
|
risk_free_col=prepared["risk_free_col"],
|
|
)
|
|
except ValueError as exc:
|
|
warnings.warn(
|
|
f"Skipping factor attribution for {strategy_name} ({model_name}): {exc}",
|
|
UserWarning,
|
|
stacklevel=2,
|
|
)
|
|
continue
|
|
|
|
summary_row: dict[str, object] = {
|
|
"strategy": strategy_name,
|
|
"market": market_name,
|
|
"model": model_name,
|
|
"factor_source": prepared["factor_source"],
|
|
"proxy_only": prepared["proxy_only"],
|
|
"start_date": regression_result["start_date"],
|
|
"end_date": regression_result["end_date"],
|
|
"n_obs": regression_result["n_obs"],
|
|
"alpha_daily": regression_result["alpha_daily"],
|
|
"alpha_ann": regression_result["alpha_ann"],
|
|
"alpha_t_stat": regression_result["alpha_t_stat"],
|
|
"alpha_p_value": regression_result["alpha_p_value"],
|
|
"r_squared": regression_result["r_squared"],
|
|
"adj_r_squared": regression_result["adj_r_squared"],
|
|
"residual_vol_ann": regression_result["residual_vol_ann"],
|
|
"beta_mkt": np.nan,
|
|
"beta_smb": np.nan,
|
|
"beta_hml": np.nan,
|
|
"beta_rmw": np.nan,
|
|
"beta_cma": np.nan,
|
|
"beta_smb_proxy": np.nan,
|
|
"beta_hml_proxy": np.nan,
|
|
"beta_rmw_proxy": np.nan,
|
|
"beta_cma_proxy": np.nan,
|
|
"beta_mom": np.nan,
|
|
"beta_lowvol": np.nan,
|
|
"beta_recovery": np.nan,
|
|
}
|
|
for factor_name, beta in regression_result["betas"].items():
|
|
summary_column = SUMMARY_BETA_COLUMN_BY_FACTOR.get(factor_name)
|
|
if summary_column is not None:
|
|
summary_row[summary_column] = beta
|
|
loading_rows.append(
|
|
{
|
|
"strategy": strategy_name,
|
|
"market": market_name,
|
|
"model": model_name,
|
|
"factor_source": prepared["factor_source"],
|
|
"proxy_only": prepared["proxy_only"],
|
|
"factor": factor_name,
|
|
"beta": beta,
|
|
"t_stat": regression_result["t_stats"][factor_name],
|
|
"p_value": regression_result["p_values"][factor_name],
|
|
}
|
|
)
|
|
|
|
summary_rows.append(summary_row)
|
|
|
|
summary_df = pd.DataFrame(summary_rows, columns=SUMMARY_COLUMNS)
|
|
loadings_df = pd.DataFrame(loading_rows, columns=LOADING_COLUMNS)
|
|
return summary_df, loadings_df
|
|
|
|
|
|
def export_attribution(
|
|
summary_df: pd.DataFrame,
|
|
loadings_df: pd.DataFrame,
|
|
output_dir: Path | str,
|
|
) -> None:
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
summary_df.to_csv(output_path / "summary.csv", index=False)
|
|
loadings_df.to_csv(output_path / "loadings.csv", index=False)
|
|
|
|
|
|
def _describe_alpha(alpha_ann: float) -> str:
|
|
if alpha_ann > 0.02:
|
|
return "positive"
|
|
if alpha_ann < -0.02:
|
|
return "negative"
|
|
return "close to flat"
|
|
|
|
|
|
def _describe_fit(r_squared: float) -> str:
|
|
if r_squared >= 0.75:
|
|
return "strong"
|
|
if r_squared >= 0.4:
|
|
return "moderate"
|
|
return "weak"
|
|
|
|
|
|
def _top_loading_descriptions(row: pd.Series, limit: int = 2) -> str:
|
|
beta_columns = [column for column in row.index if column.startswith("beta_")]
|
|
present = []
|
|
for column in beta_columns:
|
|
value = row.get(column)
|
|
if pd.notna(value):
|
|
present.append((column.removeprefix("beta_").upper(), float(value)))
|
|
|
|
if not present:
|
|
return "no material factor loadings were estimated"
|
|
|
|
top_loadings = sorted(present, key=lambda item: abs(item[1]), reverse=True)[:limit]
|
|
return ", ".join(f"{name} {value:.2f}" for name, value in top_loadings)
|
|
|
|
|
|
def print_attribution_summary(summary_df: pd.DataFrame) -> None:
|
|
if summary_df.empty:
|
|
print("Factor attribution: no usable regressions were produced.")
|
|
return
|
|
|
|
display_columns = [
|
|
"strategy",
|
|
"market",
|
|
"model",
|
|
"alpha_ann",
|
|
"r_squared",
|
|
"residual_vol_ann",
|
|
"beta_mkt",
|
|
"beta_smb",
|
|
"beta_hml",
|
|
"beta_rmw",
|
|
"beta_cma",
|
|
"beta_smb_proxy",
|
|
"beta_hml_proxy",
|
|
"beta_rmw_proxy",
|
|
"beta_cma_proxy",
|
|
"beta_mom",
|
|
"beta_lowvol",
|
|
"beta_recovery",
|
|
]
|
|
table = summary_df.reindex(columns=display_columns).copy()
|
|
numeric_columns = [column for column in display_columns if column not in {"strategy", "market", "model"}]
|
|
table.loc[:, numeric_columns] = table.loc[:, numeric_columns].round(4)
|
|
|
|
print("\nFactor attribution")
|
|
print(table.to_string(index=False, na_rep=""))
|
|
print("\nInterpretation")
|
|
for _, row in summary_df.iterrows():
|
|
print(
|
|
f"- {row['strategy']} / {row['model']}: estimated annualized alpha is "
|
|
f"{_describe_alpha(float(row['alpha_ann']))} ({row['alpha_ann']:.2%}); "
|
|
f"strongest loadings are {_top_loading_descriptions(row)}; "
|
|
f"model fit looks {_describe_fit(float(row['r_squared']))} (R^2={row['r_squared']:.2f})."
|
|
)
|