from __future__ import annotations import http.client import io import socket import ssl import warnings import zipfile from pathlib import Path from urllib.error import URLError from urllib.request import Request, urlopen import numpy as np import pandas as pd from scipy import stats KEN_FRENCH_DAILY_FF5_ZIP_URL = ( "https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/" "F-F_Research_Data_5_Factors_2x3_daily_CSV.zip" ) EXPECTED_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"] CAPM_FACTOR_COLUMNS = ["MKT_RF"] FF5_FACTOR_COLUMNS = ["MKT_RF", "SMB", "HML", "RMW", "CMA"] EXTENSION_FACTOR_COLUMNS = ["MOM", "LOWVOL", "RECOVERY"] FF5PLUS_FACTOR_COLUMNS = FF5_FACTOR_COLUMNS + EXTENSION_FACTOR_COLUMNS PROXY_FACTOR_COLUMNS = [ "MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY", ] + EXTENSION_FACTOR_COLUMNS TRADING_DAYS_PER_YEAR = 252 class ExternalFactorFormatError(ValueError): pass class ExternalFactorDownloadError(OSError): pass def _download_kf_zip_bytes() -> bytes: request = Request( KEN_FRENCH_DAILY_FF5_ZIP_URL, headers={"User-Agent": "quant-factor-attribution/0.1"}, ) try: with urlopen(request, timeout=30) as response: return response.read() except ( URLError, TimeoutError, ConnectionError, socket.timeout, socket.gaierror, ssl.SSLError, http.client.HTTPException, http.client.IncompleteRead, http.client.RemoteDisconnected, ) as exc: raise ExternalFactorDownloadError(f"Failed to download external factor data: {exc}") from exc def _parse_kf_daily_csv(raw_bytes: bytes) -> pd.DataFrame: with zipfile.ZipFile(io.BytesIO(raw_bytes)) as archive: member_names = [ name for name in archive.namelist() if not name.endswith("/") and name.lower().endswith((".csv", ".txt")) ] if not member_names: raise ExternalFactorFormatError("Ken French archive did not contain a CSV or TXT file") try: text = archive.read(member_names[0]).decode("utf-8-sig") except UnicodeDecodeError as exc: raise ExternalFactorFormatError("Ken French factor file was not valid UTF-8 text") from exc lines = [line for line in text.splitlines() if line.strip()] try: header_index = next(i for i, line in enumerate(lines) if "Mkt-RF" in line) except StopIteration as exc: raise ExternalFactorFormatError("Ken French factor file was missing the daily factor header") from exc table = "\n".join(lines[header_index:]) try: factors = pd.read_csv(io.StringIO(table)) except pd.errors.ParserError as exc: raise ExternalFactorFormatError("Ken French factor table could not be parsed") from exc factors = factors.rename(columns={"Mkt-RF": "MKT_RF"}) date_column = factors.columns[0] missing_columns = [column for column in EXPECTED_FACTOR_COLUMNS if column not in factors.columns] if missing_columns: raise ExternalFactorFormatError( f"Ken French factor table was missing columns: {', '.join(missing_columns)}" ) factors = factors[factors[date_column].astype(str).str.fullmatch(r"\d{8}")] if factors.empty: raise ExternalFactorFormatError("Ken French factor table did not contain daily rows") try: factors[date_column] = pd.to_datetime(factors[date_column], format="%Y%m%d") except ValueError as exc: raise ExternalFactorFormatError("Ken French factor table contained invalid dates") from exc factors = factors.set_index(date_column) factors.index.name = None try: factors = factors[EXPECTED_FACTOR_COLUMNS].astype(float) / 100.0 except ValueError as exc: raise ExternalFactorFormatError("Ken French factor table contained non-numeric values") from exc return factors def _warn_and_load_cached_factors(cache_path: Path, reason: str) -> pd.DataFrame: warnings.warn( f"Using cached data from {cache_path} because {reason}.", UserWarning, stacklevel=2, ) return pd.read_csv(cache_path, index_col=0, parse_dates=True) def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataFrame: cache_path = Path(cache_dir) / "ff5_us_daily.csv" cache_path.parent.mkdir(parents=True, exist_ok=True) try: raw_bytes = _download_kf_zip_bytes() except ExternalFactorDownloadError as exc: if cache_path.exists(): return _warn_and_load_cached_factors(cache_path, f"download failed: {exc}") raise try: factors = _parse_kf_daily_csv(raw_bytes) except zipfile.BadZipFile as exc: if cache_path.exists(): return _warn_and_load_cached_factors(cache_path, f"the upstream ZIP was invalid: {exc}") raise except ExternalFactorFormatError as exc: if cache_path.exists(): return _warn_and_load_cached_factors( cache_path, f"the upstream factor format was invalid: {exc}", ) raise factors.to_csv(cache_path) return factors def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame: stocks = price_data.drop(columns=[benchmark], errors="ignore") return stocks.sort_index().astype(float) def _long_short_factor( scores: pd.DataFrame, returns: pd.DataFrame, quantile: float = 0.3, ) -> pd.Series: lagged_scores = scores.shift(1) high_cutoff = lagged_scores.quantile(1 - quantile, axis=1) low_cutoff = lagged_scores.quantile(quantile, axis=1) long_mask = lagged_scores.ge(high_cutoff, axis=0) short_mask = lagged_scores.le(low_cutoff, axis=0) long_returns = returns.where(long_mask).mean(axis=1) short_returns = returns.where(short_mask).mean(axis=1) return (long_returns - short_returns).rename(None) def build_extension_factors( price_data: pd.DataFrame, benchmark: str, market: str, ) -> pd.DataFrame: del market stocks = _select_stock_prices(price_data, benchmark) returns = stocks.pct_change() momentum_scores = stocks.shift(21).pct_change(231) low_vol_scores = -returns.rolling(60, min_periods=60).std() recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0 return pd.DataFrame( { "MOM": _long_short_factor(momentum_scores, returns), "LOWVOL": _long_short_factor(low_vol_scores, returns), "RECOVERY": _long_short_factor(recovery_scores, returns), }, index=price_data.index, ) def _positive_share(values: np.ndarray) -> float: return float(np.mean(values > 0)) def build_proxy_core_factors( price_data: pd.DataFrame, benchmark: str, market: str, ) -> pd.DataFrame: del market stocks = _select_stock_prices(price_data, benchmark) returns = stocks.pct_change() if benchmark in price_data: market_factor = price_data[benchmark].astype(float).pct_change() else: market_factor = returns.mean(axis=1) inverse_price_scores = -stocks value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0) profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True) investment_proxy_scores = -stocks.pct_change(126) return pd.DataFrame( { "MKT": market_factor, "SMB_PROXY": _long_short_factor(inverse_price_scores, returns), "HML_PROXY": _long_short_factor(value_proxy_scores, returns), "RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns), "CMA_PROXY": _long_short_factor(investment_proxy_scores, returns), }, index=price_data.index, ) def prepare_factor_models( market: str, extension_factors: pd.DataFrame, proxy_factors: pd.DataFrame | None = None, external_factors: pd.DataFrame | None = None, ) -> dict[str, object]: market_name = market.lower() if market_name == "us" and external_factors is not None: factor_frame = pd.concat([external_factors, extension_factors], axis=1) return { "factor_frame": factor_frame, "models": { "capm": CAPM_FACTOR_COLUMNS.copy(), "ff5": FF5_FACTOR_COLUMNS.copy(), "ff5plus": FF5PLUS_FACTOR_COLUMNS.copy(), }, "risk_free_col": "RF", "factor_source": "external+local", "proxy_only": False, "model_family": "standard", } if proxy_factors is None: raise ValueError("proxy_factors are required when external factors are unavailable") factor_frame = pd.concat([proxy_factors, extension_factors], axis=1) return { "factor_frame": factor_frame, "models": {"proxy": PROXY_FACTOR_COLUMNS.copy()}, "risk_free_col": None, "factor_source": "proxy_only", "proxy_only": True, "model_family": "proxy", } def run_factor_regression( strategy_returns: pd.Series, factor_frame: pd.DataFrame, factor_cols: list[str], risk_free_col: str | None = None, ) -> dict[str, object]: regression_frame = pd.concat( [strategy_returns.rename("strategy"), factor_frame[factor_cols + ([risk_free_col] if risk_free_col else [])]], axis=1, ).dropna() if regression_frame.empty: raise ValueError("No overlapping strategy and factor observations were available for regression") y = regression_frame["strategy"].astype(float) if risk_free_col is not None: y = y - regression_frame[risk_free_col].astype(float) x = regression_frame[factor_cols].astype(float).to_numpy() x = np.column_stack([np.ones(len(regression_frame)), x]) n_obs = len(regression_frame) param_count = x.shape[1] if n_obs <= param_count: raise ValueError( f"Insufficient observations for regression: need more than {param_count} rows, got {n_obs}" ) coefficients, _, rank, _ = np.linalg.lstsq(x, y.to_numpy(), rcond=None) if rank < param_count: raise ValueError( "Regression design matrix is rank-deficient; coefficients are not uniquely identified" ) fitted = x @ coefficients residuals = y.to_numpy() - fitted dof = n_obs - param_count residual_variance = float((residuals @ residuals) / dof) covariance = residual_variance * np.linalg.pinv(x.T @ x) standard_errors = np.sqrt(np.diag(covariance)) with np.errstate(divide="ignore", invalid="ignore"): t_stats = np.divide( coefficients, standard_errors, out=np.full_like(coefficients, np.nan, dtype=float), where=standard_errors > 0, ) p_values = 2.0 * stats.t.sf(np.abs(t_stats), df=dof) ss_total = float(((y - y.mean()) ** 2).sum()) ss_residual = float(np.sum(residuals**2)) r_squared = 1.0 - ss_residual / ss_total if ss_total else 0.0 if n_obs > param_count: adj_r_squared = 1.0 - (1.0 - r_squared) * (n_obs - 1) / (n_obs - param_count) else: adj_r_squared = 0.0 factor_slice = slice(1, None) residual_series = pd.Series(residuals, index=regression_frame.index) return { "alpha_daily": float(coefficients[0]), "alpha_ann": float(coefficients[0] * TRADING_DAYS_PER_YEAR), "alpha_t_stat": float(t_stats[0]), "alpha_p_value": float(p_values[0]), "betas": {name: float(value) for name, value in zip(factor_cols, coefficients[factor_slice])}, "t_stats": {name: float(value) for name, value in zip(factor_cols, t_stats[factor_slice])}, "p_values": {name: float(value) for name, value in zip(factor_cols, p_values[factor_slice])}, "r_squared": float(r_squared), "adj_r_squared": float(adj_r_squared), "residual_vol_ann": float(residual_series.std(ddof=1) * np.sqrt(TRADING_DAYS_PER_YEAR)), "start_date": regression_frame.index.min().date().isoformat(), "end_date": regression_frame.index.max().date().isoformat(), "n_obs": n_obs, }