from __future__ import annotations import json import http.client import io import socket import ssl import warnings import zipfile from pathlib import Path from urllib.error import URLError from urllib.request import Request, urlopen import numpy as np import pandas as pd from scipy import stats KEN_FRENCH_DAILY_FF5_ZIP_URL = ( "https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/" "F-F_Research_Data_5_Factors_2x3_daily_CSV.zip" ) EXPECTED_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"] CAPM_FACTOR_COLUMNS = ["MKT_RF"] FF5_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA"] EXTENSION_FACTOR_COLUMNS = ["MOM", "LOWVOL", "RECOVERY"] FF5PLUS_FACTOR_COLUMNS = FF5_FACTOR_COLUMNS + EXTENSION_FACTOR_COLUMNS PROXY_FACTOR_COLUMNS = [ "MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY", ] + EXTENSION_FACTOR_COLUMNS TRADING_DAYS_PER_YEAR = 252 MISSING_BENCHMARK_SENTINEL = "__missing_benchmark__" SUMMARY_BETA_COLUMN_BY_FACTOR = { "MKT_RF": "beta_mkt", "MKT": "beta_mkt", "SMB": "beta_smb", "SMB_PROXY": "beta_smb", "HML": "beta_hml", "HML_PROXY": "beta_hml", "RMW": "beta_rmw", "RMW_PROXY": "beta_rmw", "CMA": "beta_cma", "CMA_PROXY": "beta_cma", "MOM": "beta_mom", "LOWVOL": "beta_lowvol", "RECOVERY": "beta_recovery", } SUMMARY_COLUMNS = [ "strategy", "market", "model", "factor_source", "proxy_only", "beta_semantics", "start_date", "end_date", "n_obs", "alpha_daily", "alpha_ann", "alpha_t_stat", "alpha_p_value", "r_squared", "adj_r_squared", "residual_vol_ann", "beta_mkt", "beta_smb", "beta_hml", "beta_rmw", "beta_cma", "beta_mom", "beta_lowvol", "beta_recovery", ] LOADING_COLUMNS = [ "strategy", "market", "model", "factor_source", "proxy_only", "factor", "beta", "t_stat", "p_value", ] SEMANTIC_BETA_COLUMNS = [ "beta_mkt", "beta_smb", "beta_hml", "beta_rmw", "beta_cma", "beta_mom", "beta_lowvol", "beta_recovery", ] class ExternalFactorFormatError(ValueError): pass class ExternalFactorDownloadError(OSError): pass def _download_kf_zip_bytes() -> bytes: request = Request( KEN_FRENCH_DAILY_FF5_ZIP_URL, headers={"User-Agent": "quant-factor-attribution/0.1"}, ) try: with urlopen(request, timeout=30) as response: return response.read() except ( URLError, TimeoutError, ConnectionError, socket.timeout, socket.gaierror, ssl.SSLError, http.client.HTTPException, http.client.IncompleteRead, http.client.RemoteDisconnected, ) as exc: raise ExternalFactorDownloadError(f"Failed to download external factor data: {exc}") from exc def _parse_kf_daily_csv(raw_bytes: bytes) -> pd.DataFrame: with zipfile.ZipFile(io.BytesIO(raw_bytes)) as archive: member_names = [ name for name in archive.namelist() if not name.endswith("/") and name.lower().endswith((".csv", ".txt")) ] if not member_names: raise ExternalFactorFormatError("Ken French archive did not contain a CSV or TXT file") try: text = archive.read(member_names[0]).decode("utf-8-sig") except UnicodeDecodeError as exc: raise ExternalFactorFormatError("Ken French factor file was not valid UTF-8 text") from exc lines = [line for line in text.splitlines() if line.strip()] try: header_index = next(i for i, line in enumerate(lines) if "Mkt-RF" in line) except StopIteration as exc: raise ExternalFactorFormatError("Ken French factor file was missing the daily factor header") from exc table = "\n".join(lines[header_index:]) try: factors = pd.read_csv(io.StringIO(table)) except pd.errors.ParserError as exc: raise ExternalFactorFormatError("Ken French factor table could not be parsed") from exc factors = factors.rename(columns={"Mkt-RF": "MKT_RF"}) date_column = factors.columns[0] missing_columns = [column for column in EXPECTED_FACTOR_COLUMNS if column not in factors.columns] if missing_columns: raise ExternalFactorFormatError( f"Ken French factor table was missing columns: {', '.join(missing_columns)}" ) factors = factors[factors[date_column].astype(str).str.fullmatch(r"\d{8}")] if factors.empty: raise ExternalFactorFormatError("Ken French factor table did not contain daily rows") try: factors[date_column] = pd.to_datetime(factors[date_column], format="%Y%m%d") except ValueError as exc: raise ExternalFactorFormatError("Ken French factor table contained invalid dates") from exc factors = factors.set_index(date_column) factors.index.name = None try: factors = factors[EXPECTED_FACTOR_COLUMNS].astype(float) / 100.0 except ValueError as exc: raise ExternalFactorFormatError("Ken French factor table contained non-numeric values") from exc return factors def _warn_and_load_cached_factors(cache_path: Path, reason: str) -> pd.DataFrame: warnings.warn( f"Using cached data from {cache_path} because {reason}.", UserWarning, stacklevel=2, ) return pd.read_csv(cache_path, index_col=0, parse_dates=True) def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataFrame: cache_path = Path(cache_dir) / "ff5_us_daily.csv" cache_path.parent.mkdir(parents=True, exist_ok=True) try: raw_bytes = _download_kf_zip_bytes() except ExternalFactorDownloadError as exc: if cache_path.exists(): return _warn_and_load_cached_factors(cache_path, f"download failed: {exc}") raise try: factors = _parse_kf_daily_csv(raw_bytes) except zipfile.BadZipFile as exc: if cache_path.exists(): return _warn_and_load_cached_factors(cache_path, f"the upstream ZIP was invalid: {exc}") raise except ExternalFactorFormatError as exc: if cache_path.exists(): return _warn_and_load_cached_factors( cache_path, f"the upstream factor format was invalid: {exc}", ) raise factors.to_csv(cache_path) return factors def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame: stocks = price_data.drop(columns=[benchmark], errors="ignore") return stocks.sort_index().astype(float) def _long_short_factor( scores: pd.DataFrame, returns: pd.DataFrame, quantile: float = 0.3, ) -> pd.Series: lagged_scores = scores.shift(1) high_cutoff = lagged_scores.quantile(1 - quantile, axis=1) low_cutoff = lagged_scores.quantile(quantile, axis=1) long_mask = lagged_scores.ge(high_cutoff, axis=0) short_mask = lagged_scores.le(low_cutoff, axis=0) long_returns = returns.where(long_mask).mean(axis=1) short_returns = returns.where(short_mask).mean(axis=1) return (long_returns - short_returns).rename(None) def build_extension_factors( price_data: pd.DataFrame, benchmark: str, market: str, ) -> pd.DataFrame: del market stocks = _select_stock_prices(price_data, benchmark) returns = stocks.pct_change() momentum_scores = stocks.shift(21).pct_change(231) low_vol_scores = -returns.rolling(60, min_periods=60).std() recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0 return pd.DataFrame( { "MOM": _long_short_factor(momentum_scores, returns), "LOWVOL": _long_short_factor(low_vol_scores, returns), "RECOVERY": _long_short_factor(recovery_scores, returns), }, index=price_data.index, ) def _positive_share(values: np.ndarray) -> float: return float(np.mean(values > 0)) def build_proxy_core_factors( price_data: pd.DataFrame, benchmark: str, market: str, ) -> pd.DataFrame: del market stocks = _select_stock_prices(price_data, benchmark) returns = stocks.pct_change() if benchmark in price_data: market_factor = price_data[benchmark].astype(float).pct_change() else: market_factor = returns.mean(axis=1) inverse_price_scores = -stocks value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0) profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True) investment_proxy_scores = -stocks.pct_change(126) return pd.DataFrame( { "MKT": market_factor, "SMB_PROXY": _long_short_factor(inverse_price_scores, returns), "HML_PROXY": _long_short_factor(value_proxy_scores, returns), "RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns), "CMA_PROXY": _long_short_factor(investment_proxy_scores, returns), }, index=price_data.index, ) def prepare_factor_models( market: str, extension_factors: pd.DataFrame, proxy_factors: pd.DataFrame | None = None, external_factors: pd.DataFrame | None = None, ) -> dict[str, object]: market_name = market.lower() if market_name == "us" and external_factors is not None: factor_frame = pd.concat([external_factors, extension_factors], axis=1) return { "factor_frame": factor_frame, "models": { "capm": CAPM_FACTOR_COLUMNS.copy(), "ff5": FF5_FACTOR_COLUMNS.copy(), "ff5plus": FF5PLUS_FACTOR_COLUMNS.copy(), }, "risk_free_col": "RF", "factor_source": "external+local", "proxy_only": False, "model_family": "standard", } if proxy_factors is None: raise ValueError("proxy_factors are required when external factors are unavailable") factor_frame = pd.concat([proxy_factors, extension_factors], axis=1) return { "factor_frame": factor_frame, "models": {"proxy": PROXY_FACTOR_COLUMNS.copy()}, "risk_free_col": None, "factor_source": "proxy_only", "proxy_only": True, "model_family": "proxy", } def run_factor_regression( strategy_returns: pd.Series, factor_frame: pd.DataFrame, factor_cols: list[str], risk_free_col: str | None = None, ) -> dict[str, object]: regression_frame = pd.concat( [strategy_returns.rename("strategy"), factor_frame[factor_cols + ([risk_free_col] if risk_free_col else [])]], axis=1, ).dropna() if regression_frame.empty: raise ValueError("No overlapping strategy and factor observations were available for regression") y = regression_frame["strategy"].astype(float) if risk_free_col is not None: y = y - regression_frame[risk_free_col].astype(float) x = regression_frame[factor_cols].astype(float).to_numpy() x = np.column_stack([np.ones(len(regression_frame)), x]) n_obs = len(regression_frame) param_count = x.shape[1] if n_obs < param_count: raise ValueError( f"Insufficient observations for regression: need at least {param_count} rows, got {n_obs}" ) coefficients, _, rank, _ = np.linalg.lstsq(x, y.to_numpy(), rcond=None) if rank < param_count: raise ValueError( "Regression design matrix is rank-deficient; coefficients are not uniquely identified" ) fitted = x @ coefficients residuals = y.to_numpy() - fitted residual_series = pd.Series(residuals, index=regression_frame.index) if len(residual_series) == 1: residual_vol_ann = 0.0 else: residual_vol_ann = float(residual_series.std(ddof=1) * np.sqrt(TRADING_DAYS_PER_YEAR)) dof = n_obs - param_count if dof > 0: residual_variance = float((residuals @ residuals) / dof) covariance = residual_variance * np.linalg.pinv(x.T @ x) standard_errors = np.sqrt(np.diag(covariance)) with np.errstate(divide="ignore", invalid="ignore"): t_stats = np.divide( coefficients, standard_errors, out=np.full_like(coefficients, np.nan, dtype=float), where=standard_errors > 0, ) p_values = 2.0 * stats.t.sf(np.abs(t_stats), df=dof) adj_r_squared_is_defined = True else: t_stats = np.full_like(coefficients, np.nan, dtype=float) p_values = np.full_like(coefficients, np.nan, dtype=float) adj_r_squared_is_defined = False ss_total = float(((y - y.mean()) ** 2).sum()) ss_residual = float(np.sum(residuals**2)) r_squared = 1.0 - ss_residual / ss_total if ss_total else 0.0 if adj_r_squared_is_defined: adj_r_squared = 1.0 - (1.0 - r_squared) * (n_obs - 1) / (n_obs - param_count) else: adj_r_squared = float("nan") factor_slice = slice(1, None) return { "alpha_daily": float(coefficients[0]), "alpha_ann": float(coefficients[0] * TRADING_DAYS_PER_YEAR), "alpha_t_stat": float(t_stats[0]), "alpha_p_value": float(p_values[0]), "betas": {name: float(value) for name, value in zip(factor_cols, coefficients[factor_slice])}, "t_stats": {name: float(value) for name, value in zip(factor_cols, t_stats[factor_slice])}, "p_values": {name: float(value) for name, value in zip(factor_cols, p_values[factor_slice])}, "r_squared": float(r_squared), "adj_r_squared": float(adj_r_squared), "residual_vol_ann": residual_vol_ann, "start_date": regression_frame.index.min().date().isoformat(), "end_date": regression_frame.index.max().date().isoformat(), "n_obs": n_obs, } def _empty_attribution_frames() -> tuple[pd.DataFrame, pd.DataFrame]: return ( pd.DataFrame(columns=SUMMARY_COLUMNS), pd.DataFrame(columns=LOADING_COLUMNS), ) def _select_model_names( model_selection: str, available_models: dict[str, list[str]], ) -> list[str]: if model_selection == "all": return list(available_models) if model_selection in available_models: return [model_selection] return list(available_models) def _resolve_benchmark_symbol(benchmark: str | None) -> str: if benchmark is None: return MISSING_BENCHMARK_SENTINEL return benchmark def _beta_semantics_map(proxy_only: bool) -> dict[str, str]: return { "beta_mkt": "MKT" if proxy_only else "MKT_RF", "beta_smb": "SMB_PROXY" if proxy_only else "SMB", "beta_hml": "HML_PROXY" if proxy_only else "HML", "beta_rmw": "RMW_PROXY" if proxy_only else "RMW", "beta_cma": "CMA_PROXY" if proxy_only else "CMA", "beta_mom": "MOM", "beta_lowvol": "LOWVOL", "beta_recovery": "RECOVERY", } def _resolve_beta_semantics(row: pd.Series) -> dict[str, str]: canonical = _beta_semantics_map(bool(row.get("proxy_only", False))) raw_value = row.get("beta_semantics") if isinstance(raw_value, str) and raw_value: try: parsed = json.loads(raw_value) except json.JSONDecodeError: return canonical else: if isinstance(parsed, dict): parsed_mapping = {str(key): str(value) for key, value in parsed.items()} if set(parsed_mapping) == set(SEMANTIC_BETA_COLUMNS) and parsed_mapping == canonical: return parsed_mapping return canonical def _section_beta_header_map(summary_df: pd.DataFrame) -> dict[str, str]: if summary_df.empty: return {} semantics = _resolve_beta_semantics(summary_df.iloc[0]) header_map: dict[str, str] = {} for beta_column, factor_name in semantics.items(): suffix = factor_name.lower() if suffix == "mkt_rf": suffix = "mkt" header_map[beta_column] = f"beta_{suffix}" return header_map def attribute_strategies( results_df: pd.DataFrame, benchmark_label: str, price_data: pd.DataFrame, market: str, model_selection: str = "all", benchmark: str | None = None, external_factors: pd.DataFrame | None = None, ) -> tuple[pd.DataFrame, pd.DataFrame]: benchmark_symbol = _resolve_benchmark_symbol(benchmark) extension_factors = build_extension_factors(price_data, benchmark=benchmark_symbol, market=market) resolved_external_factors = external_factors market_name = market.lower() if market_name == "us" and resolved_external_factors is None: try: resolved_external_factors = load_external_us_factors() except (ExternalFactorDownloadError, ExternalFactorFormatError, zipfile.BadZipFile) as exc: warnings.warn( f"Falling back to proxy factor attribution because external US factors were unavailable: {exc}", UserWarning, stacklevel=2, ) resolved_external_factors = None proxy_factors = None if market_name != "us" or resolved_external_factors is None: proxy_factors = build_proxy_core_factors(price_data, benchmark=benchmark_symbol, market=market) prepared = prepare_factor_models( market=market, extension_factors=extension_factors, proxy_factors=proxy_factors, external_factors=resolved_external_factors, ) model_names = _select_model_names(model_selection, prepared["models"]) strategy_returns = results_df.sort_index().pct_change(fill_method=None) if strategy_returns.empty: return _empty_attribution_frames() summary_rows: list[dict[str, object]] = [] loading_rows: list[dict[str, object]] = [] for strategy_name in strategy_returns.columns: if strategy_name == benchmark_label: continue for model_name in model_names: factor_cols = prepared["models"][model_name] try: regression_result = run_factor_regression( strategy_returns=strategy_returns[strategy_name], factor_frame=prepared["factor_frame"], factor_cols=factor_cols, risk_free_col=prepared["risk_free_col"], ) except ValueError as exc: warnings.warn( f"Skipping factor attribution for {strategy_name} ({model_name}): {exc}", UserWarning, stacklevel=2, ) continue summary_row: dict[str, object] = { "strategy": strategy_name, "market": market_name, "model": model_name, "factor_source": prepared["factor_source"], "proxy_only": prepared["proxy_only"], "beta_semantics": json.dumps(_beta_semantics_map(bool(prepared["proxy_only"])), sort_keys=True), "start_date": regression_result["start_date"], "end_date": regression_result["end_date"], "n_obs": regression_result["n_obs"], "alpha_daily": regression_result["alpha_daily"], "alpha_ann": regression_result["alpha_ann"], "alpha_t_stat": regression_result["alpha_t_stat"], "alpha_p_value": regression_result["alpha_p_value"], "r_squared": regression_result["r_squared"], "adj_r_squared": regression_result["adj_r_squared"], "residual_vol_ann": regression_result["residual_vol_ann"], "beta_mkt": np.nan, "beta_smb": np.nan, "beta_hml": np.nan, "beta_rmw": np.nan, "beta_cma": np.nan, "beta_mom": np.nan, "beta_lowvol": np.nan, "beta_recovery": np.nan, } for factor_name, beta in regression_result["betas"].items(): summary_column = SUMMARY_BETA_COLUMN_BY_FACTOR.get(factor_name) if summary_column is not None: summary_row[summary_column] = beta loading_rows.append( { "strategy": strategy_name, "market": market_name, "model": model_name, "factor_source": prepared["factor_source"], "proxy_only": prepared["proxy_only"], "factor": factor_name, "beta": beta, "t_stat": regression_result["t_stats"][factor_name], "p_value": regression_result["p_values"][factor_name], } ) summary_rows.append(summary_row) summary_df = pd.DataFrame(summary_rows, columns=SUMMARY_COLUMNS) loadings_df = pd.DataFrame(loading_rows, columns=LOADING_COLUMNS) return summary_df, loadings_df def export_attribution( summary_df: pd.DataFrame, loadings_df: pd.DataFrame, output_dir: Path | str, ) -> None: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) summary_df.to_csv(output_path / "summary.csv", index=False) loadings_df.to_csv(output_path / "loadings.csv", index=False) def _describe_alpha(alpha_ann: float) -> str: if alpha_ann > 0.02: return "positive" if alpha_ann < -0.02: return "negative" return "close to flat" def _describe_fit(r_squared: float) -> str: if r_squared >= 0.75: return "strong" if r_squared >= 0.4: return "moderate" return "weak" def _top_loading_descriptions(row: pd.Series, limit: int = 2) -> str: beta_columns = [column for column in row.index if column.startswith("beta_")] factor_labels = _resolve_beta_semantics(row) present = [] for column in beta_columns: value = row.get(column) label = factor_labels.get(column) if label is not None and pd.notna(value): present.append((label, float(value))) if not present: return "no material factor loadings were estimated" top_loadings = sorted(present, key=lambda item: abs(item[1]), reverse=True)[:limit] return ", ".join(f"{name} {value:.2f}" for name, value in top_loadings) def _print_attribution_section(summary_df: pd.DataFrame, title: str, proxy_labels: bool) -> None: display_columns = [ "strategy", "market", "model", "factor_source", "proxy_only", "alpha_ann", "r_squared", "residual_vol_ann", "beta_mkt", "beta_smb", "beta_hml", "beta_rmw", "beta_cma", "beta_mom", "beta_lowvol", "beta_recovery", ] table = summary_df.reindex(columns=display_columns).copy() del proxy_labels table = table.rename(columns=_section_beta_header_map(summary_df)) numeric_columns = [ column for column in table.columns if column not in {"strategy", "market", "model", "factor_source", "proxy_only"} ] table.loc[:, numeric_columns] = table.loc[:, numeric_columns].round(4) print(f"\n{title}") print(table.to_string(index=False, na_rep="")) def print_attribution_summary(summary_df: pd.DataFrame) -> None: if summary_df.empty: print("Factor attribution: no usable regressions were produced.") return proxy_mask = summary_df["proxy_only"].fillna(False).astype(bool) standard_rows = summary_df.loc[~proxy_mask] proxy_rows = summary_df.loc[proxy_mask] print("\nFactor attribution") if not standard_rows.empty: _print_attribution_section( standard_rows, title="Standard factor attribution", proxy_labels=False, ) if not proxy_rows.empty: _print_attribution_section( proxy_rows, title="Proxy factor attribution", proxy_labels=True, ) print("\nInterpretation") for _, row in summary_df.iterrows(): print( f"- {row['strategy']} / {row['model']}: estimated annualized alpha is " f"{_describe_alpha(float(row['alpha_ann']))} ({row['alpha_ann']:.2%}); " f"strongest loadings are {_top_loading_descriptions(row)}; " f"model fit looks {_describe_fit(float(row['r_squared']))} (R^2={row['r_squared']:.2f})." )