diff --git a/factor_attribution.py b/factor_attribution.py index 156fbd7..16ca955 100644 --- a/factor_attribution.py +++ b/factor_attribution.py @@ -32,6 +32,57 @@ PROXY_FACTOR_COLUMNS = [ "CMA_PROXY", ] + EXTENSION_FACTOR_COLUMNS TRADING_DAYS_PER_YEAR = 252 +SUMMARY_BETA_COLUMN_BY_FACTOR = { + "MKT_RF": "beta_mkt", + "MKT": "beta_mkt", + "SMB": "beta_smb", + "SMB_PROXY": "beta_smb", + "HML": "beta_hml", + "HML_PROXY": "beta_hml", + "RMW": "beta_rmw", + "RMW_PROXY": "beta_rmw", + "CMA": "beta_cma", + "CMA_PROXY": "beta_cma", + "MOM": "beta_mom", + "LOWVOL": "beta_lowvol", + "RECOVERY": "beta_recovery", +} +SUMMARY_COLUMNS = [ + "strategy", + "market", + "model", + "factor_source", + "proxy_only", + "start_date", + "end_date", + "n_obs", + "alpha_daily", + "alpha_ann", + "alpha_t_stat", + "alpha_p_value", + "r_squared", + "adj_r_squared", + "residual_vol_ann", + "beta_mkt", + "beta_smb", + "beta_hml", + "beta_rmw", + "beta_cma", + "beta_mom", + "beta_lowvol", + "beta_recovery", +] +LOADING_COLUMNS = [ + "strategy", + "market", + "model", + "factor_source", + "proxy_only", + "factor", + "beta", + "t_stat", + "p_value", +] class ExternalFactorFormatError(ValueError): @@ -358,3 +409,218 @@ def run_factor_regression( "end_date": regression_frame.index.max().date().isoformat(), "n_obs": n_obs, } + + +def _empty_attribution_frames() -> tuple[pd.DataFrame, pd.DataFrame]: + return ( + pd.DataFrame(columns=SUMMARY_COLUMNS), + pd.DataFrame(columns=LOADING_COLUMNS), + ) + + +def _select_model_names( + model_selection: str, + available_models: dict[str, list[str]], +) -> list[str]: + if model_selection == "all": + return list(available_models) + if model_selection in available_models: + return [model_selection] + return list(available_models) + + +def attribute_strategies( + results_df: pd.DataFrame, + benchmark_label: str, + price_data: pd.DataFrame, + market: str, + model_selection: str = "all", + benchmark: str | None = None, + external_factors: pd.DataFrame | None = None, +) -> tuple[pd.DataFrame, pd.DataFrame]: + benchmark_symbol = benchmark + if benchmark_symbol is None: + matching_columns = [column for column in price_data.columns if column in benchmark_label] + benchmark_symbol = matching_columns[0] if matching_columns else price_data.columns[-1] + + extension_factors = build_extension_factors(price_data, benchmark=benchmark_symbol, market=market) + + resolved_external_factors = external_factors + market_name = market.lower() + if market_name == "us" and resolved_external_factors is None: + try: + resolved_external_factors = load_external_us_factors() + except (ExternalFactorDownloadError, ExternalFactorFormatError, zipfile.BadZipFile) as exc: + warnings.warn( + f"Falling back to proxy factor attribution because external US factors were unavailable: {exc}", + UserWarning, + stacklevel=2, + ) + resolved_external_factors = None + + proxy_factors = None + if market_name != "us" or resolved_external_factors is None: + proxy_factors = build_proxy_core_factors(price_data, benchmark=benchmark_symbol, market=market) + + prepared = prepare_factor_models( + market=market, + extension_factors=extension_factors, + proxy_factors=proxy_factors, + external_factors=resolved_external_factors, + ) + model_names = _select_model_names(model_selection, prepared["models"]) + + strategy_returns = results_df.sort_index().pct_change(fill_method=None) + if strategy_returns.empty: + return _empty_attribution_frames() + + summary_rows: list[dict[str, object]] = [] + loading_rows: list[dict[str, object]] = [] + for strategy_name in strategy_returns.columns: + if strategy_name == benchmark_label: + continue + + for model_name in model_names: + factor_cols = prepared["models"][model_name] + try: + regression_result = run_factor_regression( + strategy_returns=strategy_returns[strategy_name], + factor_frame=prepared["factor_frame"], + factor_cols=factor_cols, + risk_free_col=prepared["risk_free_col"], + ) + except ValueError as exc: + warnings.warn( + f"Skipping factor attribution for {strategy_name} ({model_name}): {exc}", + UserWarning, + stacklevel=2, + ) + continue + + summary_row: dict[str, object] = { + "strategy": strategy_name, + "market": market_name, + "model": model_name, + "factor_source": prepared["factor_source"], + "proxy_only": prepared["proxy_only"], + "start_date": regression_result["start_date"], + "end_date": regression_result["end_date"], + "n_obs": regression_result["n_obs"], + "alpha_daily": regression_result["alpha_daily"], + "alpha_ann": regression_result["alpha_ann"], + "alpha_t_stat": regression_result["alpha_t_stat"], + "alpha_p_value": regression_result["alpha_p_value"], + "r_squared": regression_result["r_squared"], + "adj_r_squared": regression_result["adj_r_squared"], + "residual_vol_ann": regression_result["residual_vol_ann"], + "beta_mkt": np.nan, + "beta_smb": np.nan, + "beta_hml": np.nan, + "beta_rmw": np.nan, + "beta_cma": np.nan, + "beta_mom": np.nan, + "beta_lowvol": np.nan, + "beta_recovery": np.nan, + } + for factor_name, beta in regression_result["betas"].items(): + summary_column = SUMMARY_BETA_COLUMN_BY_FACTOR.get(factor_name) + if summary_column is not None: + summary_row[summary_column] = beta + loading_rows.append( + { + "strategy": strategy_name, + "market": market_name, + "model": model_name, + "factor_source": prepared["factor_source"], + "proxy_only": prepared["proxy_only"], + "factor": factor_name, + "beta": beta, + "t_stat": regression_result["t_stats"][factor_name], + "p_value": regression_result["p_values"][factor_name], + } + ) + + summary_rows.append(summary_row) + + summary_df = pd.DataFrame(summary_rows, columns=SUMMARY_COLUMNS) + loadings_df = pd.DataFrame(loading_rows, columns=LOADING_COLUMNS) + return summary_df, loadings_df + + +def export_attribution( + summary_df: pd.DataFrame, + loadings_df: pd.DataFrame, + output_dir: Path | str, +) -> None: + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + summary_df.to_csv(output_path / "summary.csv", index=False) + loadings_df.to_csv(output_path / "loadings.csv", index=False) + + +def _describe_alpha(alpha_ann: float) -> str: + if alpha_ann > 0.02: + return "positive" + if alpha_ann < -0.02: + return "negative" + return "close to flat" + + +def _describe_fit(r_squared: float) -> str: + if r_squared >= 0.75: + return "strong" + if r_squared >= 0.4: + return "moderate" + return "weak" + + +def _top_loading_descriptions(row: pd.Series, limit: int = 2) -> str: + beta_columns = [column for column in SUMMARY_COLUMNS if column.startswith("beta_")] + present = [] + for column in beta_columns: + value = row.get(column) + if pd.notna(value): + present.append((column.removeprefix("beta_").upper(), float(value))) + + if not present: + return "no material factor loadings were estimated" + + top_loadings = sorted(present, key=lambda item: abs(item[1]), reverse=True)[:limit] + return ", ".join(f"{name} {value:.2f}" for name, value in top_loadings) + + +def print_attribution_summary(summary_df: pd.DataFrame) -> None: + if summary_df.empty: + print("Factor attribution: no usable regressions were produced.") + return + + display_columns = [ + "strategy", + "market", + "model", + "alpha_ann", + "r_squared", + "residual_vol_ann", + "beta_mkt", + "beta_smb", + "beta_hml", + "beta_rmw", + "beta_cma", + "beta_mom", + "beta_lowvol", + "beta_recovery", + ] + table = summary_df.loc[:, display_columns].copy() + numeric_columns = [column for column in display_columns if column not in {"strategy", "market", "model"}] + table.loc[:, numeric_columns] = table.loc[:, numeric_columns].round(4) + + print("\nFactor attribution") + print(table.to_string(index=False, na_rep="")) + print("\nInterpretation") + for _, row in summary_df.iterrows(): + print( + f"- {row['strategy']} / {row['model']}: estimated annualized alpha is " + f"{_describe_alpha(float(row['alpha_ann']))} ({row['alpha_ann']:.2%}); " + f"strongest loadings are {_top_loading_descriptions(row)}; " + f"model fit looks {_describe_fit(float(row['r_squared']))} (R^2={row['r_squared']:.2f})." + ) diff --git a/main.py b/main.py index e870cb2..8a40e3e 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd import data_manager +import factor_attribution import metrics from strategies.adaptive_momentum import AdaptiveMomentumStrategy from strategies.buy_and_hold import BuyAndHoldStrategy @@ -163,6 +164,18 @@ def main() -> None: help="Execution mode: 'close' (default, signal & execute on close) or " "'open-close' (signal on morning open, execute at close)", ) + parser.add_argument( + "--attribution", action="store_true", + help="Run factor attribution after performance metrics", + ) + parser.add_argument( + "--attribution-model", choices=["capm", "ff5", "ff5plus", "all"], default="all", + help="Factor model selection for attribution output", + ) + parser.add_argument( + "--attribution-export", default=None, + help="Directory to export factor attribution CSVs", + ) args = parser.parse_args() initial_capital = args.capital if args.capital is not None else 10_000 use_open = args.execution == "open-close" @@ -238,6 +251,20 @@ def main() -> None: continue metrics.summary(eq, name=name) + if args.attribution: + summary_df, loadings_df = factor_attribution.attribute_strategies( + results_df=results_df, + benchmark_label=benchmark_label, + benchmark=benchmark, + price_data=data, + market=args.market, + model_selection=args.attribution_model, + ) + factor_attribution.print_attribution_summary(summary_df) + if args.attribution_export: + factor_attribution.export_attribution(summary_df, loadings_df, args.attribution_export) + print(f"Attribution CSVs written to {args.attribution_export}") + # --- Visualization --- if not args.no_plot: plot_results(results_df.dropna()) diff --git a/tests/test_factor_attribution.py b/tests/test_factor_attribution.py index 8628639..170ad80 100644 --- a/tests/test_factor_attribution.py +++ b/tests/test_factor_attribution.py @@ -1,4 +1,5 @@ import http.client +import contextlib import io import socket import ssl @@ -18,9 +19,12 @@ from factor_attribution import ( KEN_FRENCH_DAILY_FF5_ZIP_URL, _download_kf_zip_bytes, _parse_kf_daily_csv, + attribute_strategies, build_extension_factors, build_proxy_core_factors, + export_attribution, load_external_us_factors, + print_attribution_summary, prepare_factor_models, run_factor_regression, ) @@ -573,3 +577,194 @@ class RegressionTests(unittest.TestCase): list(prepared["factor_frame"].columns), ["MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY", "MOM", "LOWVOL", "RECOVERY"], ) + + +class AttributionIntegrationTests(unittest.TestCase): + def test_attribute_strategies_exports_standard_model_summary_and_loadings(self): + dates = pd.date_range("2025-01-01", periods=320, freq="B") + angles = np.linspace(0.0, 24.0, len(dates)) + factors = pd.DataFrame( + { + "MKT_RF": 0.010 * np.sin(angles), + "SMB": 0.006 * np.cos(angles * 0.7), + "HML": 0.004 * np.sin(angles * 1.3 + 0.4), + "RMW": 0.003 * np.cos(angles * 1.1 + 0.2), + "CMA": 0.002 * np.sin(angles * 0.5 + 0.8), + "RF": np.full(len(dates), 0.0001), + }, + index=dates, + ) + strategy_returns = ( + 0.0004 + + 1.10 * factors["MKT_RF"] + - 0.25 * factors["SMB"] + + 0.35 * factors["HML"] + + 0.10 * factors["RMW"] + - 0.05 * factors["CMA"] + + factors["RF"] + ) + benchmark_returns = 0.95 * factors["MKT_RF"] + factors["RF"] + results = pd.DataFrame( + { + "Strategy": 100_000.0 * (1.0 + strategy_returns).cumprod(), + "SPY (Benchmark)": 100_000.0 * (1.0 + benchmark_returns).cumprod(), + }, + index=dates, + ) + prices = self._make_price_frame(dates, benchmark="SPY") + + with tempfile.TemporaryDirectory() as tmpdir: + summary, loadings = attribute_strategies( + results_df=results, + benchmark_label="SPY (Benchmark)", + benchmark="SPY", + price_data=prices, + market="us", + model_selection="ff5", + external_factors=factors, + ) + export_attribution(summary, loadings, tmpdir) + + self.assertTrue((Path(tmpdir) / "summary.csv").exists()) + self.assertTrue((Path(tmpdir) / "loadings.csv").exists()) + + exported_summary = pd.read_csv(Path(tmpdir) / "summary.csv") + exported_loadings = pd.read_csv(Path(tmpdir) / "loadings.csv") + + self.assertEqual(len(summary), 1) + self.assertListEqual( + list(summary.columns), + [ + "strategy", + "market", + "model", + "factor_source", + "proxy_only", + "start_date", + "end_date", + "n_obs", + "alpha_daily", + "alpha_ann", + "alpha_t_stat", + "alpha_p_value", + "r_squared", + "adj_r_squared", + "residual_vol_ann", + "beta_mkt", + "beta_smb", + "beta_hml", + "beta_rmw", + "beta_cma", + "beta_mom", + "beta_lowvol", + "beta_recovery", + ], + ) + self.assertEqual(summary.loc[0, "strategy"], "Strategy") + self.assertEqual(summary.loc[0, "model"], "ff5") + self.assertEqual(summary.loc[0, "factor_source"], "external+local") + self.assertFalse(bool(summary.loc[0, "proxy_only"])) + self.assertAlmostEqual(summary.loc[0, "beta_mkt"], 1.10, places=3) + self.assertAlmostEqual(summary.loc[0, "beta_smb"], -0.25, places=3) + self.assertAlmostEqual(summary.loc[0, "beta_hml"], 0.35, places=3) + self.assertTrue(np.isnan(summary.loc[0, "beta_mom"])) + + self.assertListEqual( + list(loadings.columns), + ["strategy", "market", "model", "factor_source", "proxy_only", "factor", "beta", "t_stat", "p_value"], + ) + self.assertEqual(set(loadings["factor"]), {"MKT_RF", "SMB", "HML", "RMW", "CMA"}) + self.assertEqual(len(loadings), 5) + pd.testing.assert_frame_equal(summary, exported_summary, check_dtype=False) + pd.testing.assert_frame_equal(loadings, exported_loadings, check_dtype=False) + + def test_attribute_strategies_uses_proxy_model_for_cn_runs(self): + dates = pd.date_range("2025-01-01", periods=320, freq="B") + prices = self._make_price_frame(dates, benchmark="000300.SS") + returns = prices["000300.SS"].pct_change().fillna(0.0) * 0.7 + 0.0002 + results = pd.DataFrame( + { + "Strategy": 100_000.0 * (1.0 + returns).cumprod(), + "CSI 300 (Benchmark)": 100_000.0 * (1.0 + prices["000300.SS"].pct_change().fillna(0.0)).cumprod(), + }, + index=dates, + ) + + summary, loadings = attribute_strategies( + results_df=results, + benchmark_label="CSI 300 (Benchmark)", + benchmark="000300.SS", + price_data=prices, + market="cn", + model_selection="ff5", + external_factors=None, + ) + + self.assertEqual(len(summary), 1) + self.assertEqual(summary.loc[0, "model"], "proxy") + self.assertEqual(summary.loc[0, "factor_source"], "proxy_only") + self.assertTrue(bool(summary.loc[0, "proxy_only"])) + self.assertEqual( + set(loadings["factor"]), + {"MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY", "MOM", "LOWVOL", "RECOVERY"}, + ) + + def test_print_attribution_summary_prints_compact_table_and_interpretation(self): + summary = pd.DataFrame( + [ + { + "strategy": "Strategy", + "market": "us", + "model": "ff5", + "factor_source": "external+local", + "proxy_only": False, + "start_date": "2025-01-02", + "end_date": "2026-03-24", + "n_obs": 319, + "alpha_daily": 0.0004, + "alpha_ann": 0.1008, + "alpha_t_stat": 2.1, + "alpha_p_value": 0.04, + "r_squared": 0.82, + "adj_r_squared": 0.81, + "residual_vol_ann": 0.12, + "beta_mkt": 1.05, + "beta_smb": -0.20, + "beta_hml": 0.30, + "beta_rmw": 0.05, + "beta_cma": np.nan, + "beta_mom": np.nan, + "beta_lowvol": np.nan, + "beta_recovery": np.nan, + } + ] + ) + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + print_attribution_summary(summary) + + output = buffer.getvalue() + self.assertIn("Factor attribution", output) + self.assertIn("Strategy", output) + self.assertIn("ff5", output) + self.assertIn("alpha_ann", output) + self.assertIn("Interpretation", output) + + def _make_price_frame(self, dates: pd.DatetimeIndex, benchmark: str) -> pd.DataFrame: + steps = np.arange(len(dates), dtype=float) + data = {} + for symbol, base, drift, amplitude, frequency, phase in ( + ("AAA", 45.0, 0.0005, 0.030, 19.0, 0.1), + ("BBB", 60.0, 0.0002, 0.025, 23.0, 0.8), + ("CCC", 75.0, -0.0001, 0.035, 17.0, 1.4), + ("DDD", 90.0, 0.0007, 0.020, 29.0, 0.5), + ("EEE", 55.0, -0.0002, 0.028, 31.0, 1.9), + ("FFF", 70.0, 0.0004, 0.032, 21.0, 2.5), + ): + log_path = drift * steps + amplitude * np.sin(steps / frequency + phase) + data[symbol] = base * np.exp(log_path) + + benchmark_path = 0.0004 * steps + 0.018 * np.sin(steps / 27.0 + 0.3) + data[benchmark] = 250.0 * np.exp(benchmark_path) + return pd.DataFrame(data, index=dates)