From 7e8d24c1e9fae6c12e8a279615ff5f997ad53ae4 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Tue, 7 Apr 2026 16:16:59 +0800 Subject: [PATCH] Add local attribution factor builders --- factor_attribution.py | 82 ++++++++++++++++++++++++++++++++ tests/test_factor_attribution.py | 46 ++++++++++++++++++ 2 files changed, 128 insertions(+) diff --git a/factor_attribution.py b/factor_attribution.py index 6e79476..67ec85a 100644 --- a/factor_attribution.py +++ b/factor_attribution.py @@ -10,6 +10,7 @@ from pathlib import Path from urllib.error import URLError from urllib.request import Request, urlopen +import numpy as np import pandas as pd KEN_FRENCH_DAILY_FF5_ZIP_URL = ( @@ -140,3 +141,84 @@ def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataF factors.to_csv(cache_path) return factors + + +def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame: + stocks = price_data.drop(columns=[benchmark], errors="ignore") + return stocks.sort_index().astype(float) + + +def _long_short_factor( + scores: pd.DataFrame, + returns: pd.DataFrame, + quantile: float = 0.3, +) -> pd.Series: + lagged_scores = scores.shift(1) + high_cutoff = lagged_scores.quantile(1 - quantile, axis=1) + low_cutoff = lagged_scores.quantile(quantile, axis=1) + + long_mask = lagged_scores.ge(high_cutoff, axis=0) + short_mask = lagged_scores.le(low_cutoff, axis=0) + long_returns = returns.where(long_mask).mean(axis=1) + short_returns = returns.where(short_mask).mean(axis=1) + return (long_returns - short_returns).rename(None) + + +def build_extension_factors( + price_data: pd.DataFrame, + benchmark: str, + market: str, +) -> pd.DataFrame: + del market + + stocks = _select_stock_prices(price_data, benchmark) + returns = stocks.pct_change() + + momentum_scores = stocks.shift(21).pct_change(231) + low_vol_scores = -returns.rolling(60, min_periods=60).std() + recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0 + + return pd.DataFrame( + { + "MOM": _long_short_factor(momentum_scores, returns), + "LOWVOL": _long_short_factor(low_vol_scores, returns), + "RECOVERY": _long_short_factor(recovery_scores, returns), + }, + index=price_data.index, + ) + + +def _positive_share(values: np.ndarray) -> float: + return float(np.mean(values > 0)) + + +def build_proxy_core_factors( + price_data: pd.DataFrame, + benchmark: str, + market: str, +) -> pd.DataFrame: + del market + + stocks = _select_stock_prices(price_data, benchmark) + returns = stocks.pct_change() + + if benchmark in price_data: + market_factor = price_data[benchmark].astype(float).pct_change() + else: + market_factor = returns.mean(axis=1) + + inverse_price_scores = -stocks + value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0) + profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True) + investment_proxy_scores = -stocks.pct_change(126) + + return pd.DataFrame( + { + "MKT": market_factor, + "SMB_PROXY": _long_short_factor(inverse_price_scores, returns), + "HML_PROXY": _long_short_factor(value_proxy_scores, returns), + "RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns), + "CMA_PROXY": _long_short_factor(investment_proxy_scores, returns), + }, + index=price_data.index, + ) diff --git a/tests/test_factor_attribution.py b/tests/test_factor_attribution.py index 279c553..a259834 100644 --- a/tests/test_factor_attribution.py +++ b/tests/test_factor_attribution.py @@ -9,6 +9,7 @@ from pathlib import Path from urllib.error import URLError from unittest import mock +import numpy as np import pandas as pd from factor_attribution import ( @@ -17,6 +18,8 @@ from factor_attribution import ( KEN_FRENCH_DAILY_FF5_ZIP_URL, _download_kf_zip_bytes, _parse_kf_daily_csv, + build_extension_factors, + build_proxy_core_factors, load_external_us_factors, ) @@ -289,3 +292,46 @@ class ExternalFactorLoaderTests(unittest.TestCase): with zipfile.ZipFile(buffer, mode="w") as archive: archive.writestr(filename, contents) return buffer.getvalue() + + +class LocalFactorConstructionTests(unittest.TestCase): + def test_build_extension_factors_returns_expected_columns_with_non_null_values_after_warmup(self): + prices = self._make_price_frame(benchmark="SPY") + + factors = build_extension_factors(prices, benchmark="SPY", market="us") + + self.assertListEqual(list(factors.columns), ["MOM", "LOWVOL", "RECOVERY"]) + self.assertTrue(factors.iloc[260:].notna().all().all()) + self.assertGreater(factors.iloc[260:].abs().sum().sum(), 0.0) + + def test_build_proxy_core_factors_returns_expected_columns_with_non_null_values_after_warmup(self): + prices = self._make_price_frame(benchmark="000300.SS") + + factors = build_proxy_core_factors(prices, benchmark="000300.SS", market="cn") + + self.assertListEqual( + list(factors.columns), + ["MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"], + ) + self.assertTrue(factors.iloc[260:].notna().all().all()) + self.assertGreater(factors.iloc[260:].abs().sum().sum(), 0.0) + + def _make_price_frame(self, benchmark: str) -> pd.DataFrame: + dates = pd.date_range("2025-01-01", periods=320, freq="B") + steps = np.arange(len(dates), dtype=float) + symbols = [ + ("A", 45.0, 0.0006, 0.030, 19.0, 0.1), + ("B", 60.0, 0.0003, 0.025, 23.0, 0.8), + ("C", 75.0, -0.0002, 0.035, 17.0, 1.4), + ("D", 90.0, 0.0008, 0.020, 29.0, 0.5), + ("E", 55.0, -0.0001, 0.028, 31.0, 1.9), + ("F", 70.0, 0.0005, 0.032, 21.0, 2.5), + ] + data = {} + for symbol, base, drift, amplitude, frequency, phase in symbols: + log_path = drift * steps + amplitude * np.sin(steps / frequency + phase) + data[symbol] = base * np.exp(log_path) + + benchmark_path = 0.0004 * steps + 0.018 * np.sin(steps / 27.0 + 0.3) + data[benchmark] = 250.0 * np.exp(benchmark_path) + return pd.DataFrame(data, index=dates)