Integrate factor attribution into backtest CLI
This commit is contained in:
@@ -32,6 +32,57 @@ PROXY_FACTOR_COLUMNS = [
|
||||
"CMA_PROXY",
|
||||
] + EXTENSION_FACTOR_COLUMNS
|
||||
TRADING_DAYS_PER_YEAR = 252
|
||||
SUMMARY_BETA_COLUMN_BY_FACTOR = {
|
||||
"MKT_RF": "beta_mkt",
|
||||
"MKT": "beta_mkt",
|
||||
"SMB": "beta_smb",
|
||||
"SMB_PROXY": "beta_smb",
|
||||
"HML": "beta_hml",
|
||||
"HML_PROXY": "beta_hml",
|
||||
"RMW": "beta_rmw",
|
||||
"RMW_PROXY": "beta_rmw",
|
||||
"CMA": "beta_cma",
|
||||
"CMA_PROXY": "beta_cma",
|
||||
"MOM": "beta_mom",
|
||||
"LOWVOL": "beta_lowvol",
|
||||
"RECOVERY": "beta_recovery",
|
||||
}
|
||||
SUMMARY_COLUMNS = [
|
||||
"strategy",
|
||||
"market",
|
||||
"model",
|
||||
"factor_source",
|
||||
"proxy_only",
|
||||
"start_date",
|
||||
"end_date",
|
||||
"n_obs",
|
||||
"alpha_daily",
|
||||
"alpha_ann",
|
||||
"alpha_t_stat",
|
||||
"alpha_p_value",
|
||||
"r_squared",
|
||||
"adj_r_squared",
|
||||
"residual_vol_ann",
|
||||
"beta_mkt",
|
||||
"beta_smb",
|
||||
"beta_hml",
|
||||
"beta_rmw",
|
||||
"beta_cma",
|
||||
"beta_mom",
|
||||
"beta_lowvol",
|
||||
"beta_recovery",
|
||||
]
|
||||
LOADING_COLUMNS = [
|
||||
"strategy",
|
||||
"market",
|
||||
"model",
|
||||
"factor_source",
|
||||
"proxy_only",
|
||||
"factor",
|
||||
"beta",
|
||||
"t_stat",
|
||||
"p_value",
|
||||
]
|
||||
|
||||
|
||||
class ExternalFactorFormatError(ValueError):
|
||||
@@ -358,3 +409,218 @@ def run_factor_regression(
|
||||
"end_date": regression_frame.index.max().date().isoformat(),
|
||||
"n_obs": n_obs,
|
||||
}
|
||||
|
||||
|
||||
def _empty_attribution_frames() -> tuple[pd.DataFrame, pd.DataFrame]:
|
||||
return (
|
||||
pd.DataFrame(columns=SUMMARY_COLUMNS),
|
||||
pd.DataFrame(columns=LOADING_COLUMNS),
|
||||
)
|
||||
|
||||
|
||||
def _select_model_names(
|
||||
model_selection: str,
|
||||
available_models: dict[str, list[str]],
|
||||
) -> list[str]:
|
||||
if model_selection == "all":
|
||||
return list(available_models)
|
||||
if model_selection in available_models:
|
||||
return [model_selection]
|
||||
return list(available_models)
|
||||
|
||||
|
||||
def attribute_strategies(
|
||||
results_df: pd.DataFrame,
|
||||
benchmark_label: str,
|
||||
price_data: pd.DataFrame,
|
||||
market: str,
|
||||
model_selection: str = "all",
|
||||
benchmark: str | None = None,
|
||||
external_factors: pd.DataFrame | None = None,
|
||||
) -> tuple[pd.DataFrame, pd.DataFrame]:
|
||||
benchmark_symbol = benchmark
|
||||
if benchmark_symbol is None:
|
||||
matching_columns = [column for column in price_data.columns if column in benchmark_label]
|
||||
benchmark_symbol = matching_columns[0] if matching_columns else price_data.columns[-1]
|
||||
|
||||
extension_factors = build_extension_factors(price_data, benchmark=benchmark_symbol, market=market)
|
||||
|
||||
resolved_external_factors = external_factors
|
||||
market_name = market.lower()
|
||||
if market_name == "us" and resolved_external_factors is None:
|
||||
try:
|
||||
resolved_external_factors = load_external_us_factors()
|
||||
except (ExternalFactorDownloadError, ExternalFactorFormatError, zipfile.BadZipFile) as exc:
|
||||
warnings.warn(
|
||||
f"Falling back to proxy factor attribution because external US factors were unavailable: {exc}",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
resolved_external_factors = None
|
||||
|
||||
proxy_factors = None
|
||||
if market_name != "us" or resolved_external_factors is None:
|
||||
proxy_factors = build_proxy_core_factors(price_data, benchmark=benchmark_symbol, market=market)
|
||||
|
||||
prepared = prepare_factor_models(
|
||||
market=market,
|
||||
extension_factors=extension_factors,
|
||||
proxy_factors=proxy_factors,
|
||||
external_factors=resolved_external_factors,
|
||||
)
|
||||
model_names = _select_model_names(model_selection, prepared["models"])
|
||||
|
||||
strategy_returns = results_df.sort_index().pct_change(fill_method=None)
|
||||
if strategy_returns.empty:
|
||||
return _empty_attribution_frames()
|
||||
|
||||
summary_rows: list[dict[str, object]] = []
|
||||
loading_rows: list[dict[str, object]] = []
|
||||
for strategy_name in strategy_returns.columns:
|
||||
if strategy_name == benchmark_label:
|
||||
continue
|
||||
|
||||
for model_name in model_names:
|
||||
factor_cols = prepared["models"][model_name]
|
||||
try:
|
||||
regression_result = run_factor_regression(
|
||||
strategy_returns=strategy_returns[strategy_name],
|
||||
factor_frame=prepared["factor_frame"],
|
||||
factor_cols=factor_cols,
|
||||
risk_free_col=prepared["risk_free_col"],
|
||||
)
|
||||
except ValueError as exc:
|
||||
warnings.warn(
|
||||
f"Skipping factor attribution for {strategy_name} ({model_name}): {exc}",
|
||||
UserWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
continue
|
||||
|
||||
summary_row: dict[str, object] = {
|
||||
"strategy": strategy_name,
|
||||
"market": market_name,
|
||||
"model": model_name,
|
||||
"factor_source": prepared["factor_source"],
|
||||
"proxy_only": prepared["proxy_only"],
|
||||
"start_date": regression_result["start_date"],
|
||||
"end_date": regression_result["end_date"],
|
||||
"n_obs": regression_result["n_obs"],
|
||||
"alpha_daily": regression_result["alpha_daily"],
|
||||
"alpha_ann": regression_result["alpha_ann"],
|
||||
"alpha_t_stat": regression_result["alpha_t_stat"],
|
||||
"alpha_p_value": regression_result["alpha_p_value"],
|
||||
"r_squared": regression_result["r_squared"],
|
||||
"adj_r_squared": regression_result["adj_r_squared"],
|
||||
"residual_vol_ann": regression_result["residual_vol_ann"],
|
||||
"beta_mkt": np.nan,
|
||||
"beta_smb": np.nan,
|
||||
"beta_hml": np.nan,
|
||||
"beta_rmw": np.nan,
|
||||
"beta_cma": np.nan,
|
||||
"beta_mom": np.nan,
|
||||
"beta_lowvol": np.nan,
|
||||
"beta_recovery": np.nan,
|
||||
}
|
||||
for factor_name, beta in regression_result["betas"].items():
|
||||
summary_column = SUMMARY_BETA_COLUMN_BY_FACTOR.get(factor_name)
|
||||
if summary_column is not None:
|
||||
summary_row[summary_column] = beta
|
||||
loading_rows.append(
|
||||
{
|
||||
"strategy": strategy_name,
|
||||
"market": market_name,
|
||||
"model": model_name,
|
||||
"factor_source": prepared["factor_source"],
|
||||
"proxy_only": prepared["proxy_only"],
|
||||
"factor": factor_name,
|
||||
"beta": beta,
|
||||
"t_stat": regression_result["t_stats"][factor_name],
|
||||
"p_value": regression_result["p_values"][factor_name],
|
||||
}
|
||||
)
|
||||
|
||||
summary_rows.append(summary_row)
|
||||
|
||||
summary_df = pd.DataFrame(summary_rows, columns=SUMMARY_COLUMNS)
|
||||
loadings_df = pd.DataFrame(loading_rows, columns=LOADING_COLUMNS)
|
||||
return summary_df, loadings_df
|
||||
|
||||
|
||||
def export_attribution(
|
||||
summary_df: pd.DataFrame,
|
||||
loadings_df: pd.DataFrame,
|
||||
output_dir: Path | str,
|
||||
) -> None:
|
||||
output_path = Path(output_dir)
|
||||
output_path.mkdir(parents=True, exist_ok=True)
|
||||
summary_df.to_csv(output_path / "summary.csv", index=False)
|
||||
loadings_df.to_csv(output_path / "loadings.csv", index=False)
|
||||
|
||||
|
||||
def _describe_alpha(alpha_ann: float) -> str:
|
||||
if alpha_ann > 0.02:
|
||||
return "positive"
|
||||
if alpha_ann < -0.02:
|
||||
return "negative"
|
||||
return "close to flat"
|
||||
|
||||
|
||||
def _describe_fit(r_squared: float) -> str:
|
||||
if r_squared >= 0.75:
|
||||
return "strong"
|
||||
if r_squared >= 0.4:
|
||||
return "moderate"
|
||||
return "weak"
|
||||
|
||||
|
||||
def _top_loading_descriptions(row: pd.Series, limit: int = 2) -> str:
|
||||
beta_columns = [column for column in SUMMARY_COLUMNS if column.startswith("beta_")]
|
||||
present = []
|
||||
for column in beta_columns:
|
||||
value = row.get(column)
|
||||
if pd.notna(value):
|
||||
present.append((column.removeprefix("beta_").upper(), float(value)))
|
||||
|
||||
if not present:
|
||||
return "no material factor loadings were estimated"
|
||||
|
||||
top_loadings = sorted(present, key=lambda item: abs(item[1]), reverse=True)[:limit]
|
||||
return ", ".join(f"{name} {value:.2f}" for name, value in top_loadings)
|
||||
|
||||
|
||||
def print_attribution_summary(summary_df: pd.DataFrame) -> None:
|
||||
if summary_df.empty:
|
||||
print("Factor attribution: no usable regressions were produced.")
|
||||
return
|
||||
|
||||
display_columns = [
|
||||
"strategy",
|
||||
"market",
|
||||
"model",
|
||||
"alpha_ann",
|
||||
"r_squared",
|
||||
"residual_vol_ann",
|
||||
"beta_mkt",
|
||||
"beta_smb",
|
||||
"beta_hml",
|
||||
"beta_rmw",
|
||||
"beta_cma",
|
||||
"beta_mom",
|
||||
"beta_lowvol",
|
||||
"beta_recovery",
|
||||
]
|
||||
table = summary_df.loc[:, display_columns].copy()
|
||||
numeric_columns = [column for column in display_columns if column not in {"strategy", "market", "model"}]
|
||||
table.loc[:, numeric_columns] = table.loc[:, numeric_columns].round(4)
|
||||
|
||||
print("\nFactor attribution")
|
||||
print(table.to_string(index=False, na_rep=""))
|
||||
print("\nInterpretation")
|
||||
for _, row in summary_df.iterrows():
|
||||
print(
|
||||
f"- {row['strategy']} / {row['model']}: estimated annualized alpha is "
|
||||
f"{_describe_alpha(float(row['alpha_ann']))} ({row['alpha_ann']:.2%}); "
|
||||
f"strongest loadings are {_top_loading_descriptions(row)}; "
|
||||
f"model fit looks {_describe_fit(float(row['r_squared']))} (R^2={row['r_squared']:.2f})."
|
||||
)
|
||||
|
||||
27
main.py
27
main.py
@@ -5,6 +5,7 @@ import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
import data_manager
|
||||
import factor_attribution
|
||||
import metrics
|
||||
from strategies.adaptive_momentum import AdaptiveMomentumStrategy
|
||||
from strategies.buy_and_hold import BuyAndHoldStrategy
|
||||
@@ -163,6 +164,18 @@ def main() -> None:
|
||||
help="Execution mode: 'close' (default, signal & execute on close) or "
|
||||
"'open-close' (signal on morning open, execute at close)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--attribution", action="store_true",
|
||||
help="Run factor attribution after performance metrics",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--attribution-model", choices=["capm", "ff5", "ff5plus", "all"], default="all",
|
||||
help="Factor model selection for attribution output",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--attribution-export", default=None,
|
||||
help="Directory to export factor attribution CSVs",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
initial_capital = args.capital if args.capital is not None else 10_000
|
||||
use_open = args.execution == "open-close"
|
||||
@@ -238,6 +251,20 @@ def main() -> None:
|
||||
continue
|
||||
metrics.summary(eq, name=name)
|
||||
|
||||
if args.attribution:
|
||||
summary_df, loadings_df = factor_attribution.attribute_strategies(
|
||||
results_df=results_df,
|
||||
benchmark_label=benchmark_label,
|
||||
benchmark=benchmark,
|
||||
price_data=data,
|
||||
market=args.market,
|
||||
model_selection=args.attribution_model,
|
||||
)
|
||||
factor_attribution.print_attribution_summary(summary_df)
|
||||
if args.attribution_export:
|
||||
factor_attribution.export_attribution(summary_df, loadings_df, args.attribution_export)
|
||||
print(f"Attribution CSVs written to {args.attribution_export}")
|
||||
|
||||
# --- Visualization ---
|
||||
if not args.no_plot:
|
||||
plot_results(results_df.dropna())
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import http.client
|
||||
import contextlib
|
||||
import io
|
||||
import socket
|
||||
import ssl
|
||||
@@ -18,9 +19,12 @@ from factor_attribution import (
|
||||
KEN_FRENCH_DAILY_FF5_ZIP_URL,
|
||||
_download_kf_zip_bytes,
|
||||
_parse_kf_daily_csv,
|
||||
attribute_strategies,
|
||||
build_extension_factors,
|
||||
build_proxy_core_factors,
|
||||
export_attribution,
|
||||
load_external_us_factors,
|
||||
print_attribution_summary,
|
||||
prepare_factor_models,
|
||||
run_factor_regression,
|
||||
)
|
||||
@@ -573,3 +577,194 @@ class RegressionTests(unittest.TestCase):
|
||||
list(prepared["factor_frame"].columns),
|
||||
["MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY", "MOM", "LOWVOL", "RECOVERY"],
|
||||
)
|
||||
|
||||
|
||||
class AttributionIntegrationTests(unittest.TestCase):
|
||||
def test_attribute_strategies_exports_standard_model_summary_and_loadings(self):
|
||||
dates = pd.date_range("2025-01-01", periods=320, freq="B")
|
||||
angles = np.linspace(0.0, 24.0, len(dates))
|
||||
factors = pd.DataFrame(
|
||||
{
|
||||
"MKT_RF": 0.010 * np.sin(angles),
|
||||
"SMB": 0.006 * np.cos(angles * 0.7),
|
||||
"HML": 0.004 * np.sin(angles * 1.3 + 0.4),
|
||||
"RMW": 0.003 * np.cos(angles * 1.1 + 0.2),
|
||||
"CMA": 0.002 * np.sin(angles * 0.5 + 0.8),
|
||||
"RF": np.full(len(dates), 0.0001),
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
strategy_returns = (
|
||||
0.0004
|
||||
+ 1.10 * factors["MKT_RF"]
|
||||
- 0.25 * factors["SMB"]
|
||||
+ 0.35 * factors["HML"]
|
||||
+ 0.10 * factors["RMW"]
|
||||
- 0.05 * factors["CMA"]
|
||||
+ factors["RF"]
|
||||
)
|
||||
benchmark_returns = 0.95 * factors["MKT_RF"] + factors["RF"]
|
||||
results = pd.DataFrame(
|
||||
{
|
||||
"Strategy": 100_000.0 * (1.0 + strategy_returns).cumprod(),
|
||||
"SPY (Benchmark)": 100_000.0 * (1.0 + benchmark_returns).cumprod(),
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
prices = self._make_price_frame(dates, benchmark="SPY")
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
summary, loadings = attribute_strategies(
|
||||
results_df=results,
|
||||
benchmark_label="SPY (Benchmark)",
|
||||
benchmark="SPY",
|
||||
price_data=prices,
|
||||
market="us",
|
||||
model_selection="ff5",
|
||||
external_factors=factors,
|
||||
)
|
||||
export_attribution(summary, loadings, tmpdir)
|
||||
|
||||
self.assertTrue((Path(tmpdir) / "summary.csv").exists())
|
||||
self.assertTrue((Path(tmpdir) / "loadings.csv").exists())
|
||||
|
||||
exported_summary = pd.read_csv(Path(tmpdir) / "summary.csv")
|
||||
exported_loadings = pd.read_csv(Path(tmpdir) / "loadings.csv")
|
||||
|
||||
self.assertEqual(len(summary), 1)
|
||||
self.assertListEqual(
|
||||
list(summary.columns),
|
||||
[
|
||||
"strategy",
|
||||
"market",
|
||||
"model",
|
||||
"factor_source",
|
||||
"proxy_only",
|
||||
"start_date",
|
||||
"end_date",
|
||||
"n_obs",
|
||||
"alpha_daily",
|
||||
"alpha_ann",
|
||||
"alpha_t_stat",
|
||||
"alpha_p_value",
|
||||
"r_squared",
|
||||
"adj_r_squared",
|
||||
"residual_vol_ann",
|
||||
"beta_mkt",
|
||||
"beta_smb",
|
||||
"beta_hml",
|
||||
"beta_rmw",
|
||||
"beta_cma",
|
||||
"beta_mom",
|
||||
"beta_lowvol",
|
||||
"beta_recovery",
|
||||
],
|
||||
)
|
||||
self.assertEqual(summary.loc[0, "strategy"], "Strategy")
|
||||
self.assertEqual(summary.loc[0, "model"], "ff5")
|
||||
self.assertEqual(summary.loc[0, "factor_source"], "external+local")
|
||||
self.assertFalse(bool(summary.loc[0, "proxy_only"]))
|
||||
self.assertAlmostEqual(summary.loc[0, "beta_mkt"], 1.10, places=3)
|
||||
self.assertAlmostEqual(summary.loc[0, "beta_smb"], -0.25, places=3)
|
||||
self.assertAlmostEqual(summary.loc[0, "beta_hml"], 0.35, places=3)
|
||||
self.assertTrue(np.isnan(summary.loc[0, "beta_mom"]))
|
||||
|
||||
self.assertListEqual(
|
||||
list(loadings.columns),
|
||||
["strategy", "market", "model", "factor_source", "proxy_only", "factor", "beta", "t_stat", "p_value"],
|
||||
)
|
||||
self.assertEqual(set(loadings["factor"]), {"MKT_RF", "SMB", "HML", "RMW", "CMA"})
|
||||
self.assertEqual(len(loadings), 5)
|
||||
pd.testing.assert_frame_equal(summary, exported_summary, check_dtype=False)
|
||||
pd.testing.assert_frame_equal(loadings, exported_loadings, check_dtype=False)
|
||||
|
||||
def test_attribute_strategies_uses_proxy_model_for_cn_runs(self):
|
||||
dates = pd.date_range("2025-01-01", periods=320, freq="B")
|
||||
prices = self._make_price_frame(dates, benchmark="000300.SS")
|
||||
returns = prices["000300.SS"].pct_change().fillna(0.0) * 0.7 + 0.0002
|
||||
results = pd.DataFrame(
|
||||
{
|
||||
"Strategy": 100_000.0 * (1.0 + returns).cumprod(),
|
||||
"CSI 300 (Benchmark)": 100_000.0 * (1.0 + prices["000300.SS"].pct_change().fillna(0.0)).cumprod(),
|
||||
},
|
||||
index=dates,
|
||||
)
|
||||
|
||||
summary, loadings = attribute_strategies(
|
||||
results_df=results,
|
||||
benchmark_label="CSI 300 (Benchmark)",
|
||||
benchmark="000300.SS",
|
||||
price_data=prices,
|
||||
market="cn",
|
||||
model_selection="ff5",
|
||||
external_factors=None,
|
||||
)
|
||||
|
||||
self.assertEqual(len(summary), 1)
|
||||
self.assertEqual(summary.loc[0, "model"], "proxy")
|
||||
self.assertEqual(summary.loc[0, "factor_source"], "proxy_only")
|
||||
self.assertTrue(bool(summary.loc[0, "proxy_only"]))
|
||||
self.assertEqual(
|
||||
set(loadings["factor"]),
|
||||
{"MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY", "MOM", "LOWVOL", "RECOVERY"},
|
||||
)
|
||||
|
||||
def test_print_attribution_summary_prints_compact_table_and_interpretation(self):
|
||||
summary = pd.DataFrame(
|
||||
[
|
||||
{
|
||||
"strategy": "Strategy",
|
||||
"market": "us",
|
||||
"model": "ff5",
|
||||
"factor_source": "external+local",
|
||||
"proxy_only": False,
|
||||
"start_date": "2025-01-02",
|
||||
"end_date": "2026-03-24",
|
||||
"n_obs": 319,
|
||||
"alpha_daily": 0.0004,
|
||||
"alpha_ann": 0.1008,
|
||||
"alpha_t_stat": 2.1,
|
||||
"alpha_p_value": 0.04,
|
||||
"r_squared": 0.82,
|
||||
"adj_r_squared": 0.81,
|
||||
"residual_vol_ann": 0.12,
|
||||
"beta_mkt": 1.05,
|
||||
"beta_smb": -0.20,
|
||||
"beta_hml": 0.30,
|
||||
"beta_rmw": 0.05,
|
||||
"beta_cma": np.nan,
|
||||
"beta_mom": np.nan,
|
||||
"beta_lowvol": np.nan,
|
||||
"beta_recovery": np.nan,
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
buffer = io.StringIO()
|
||||
with contextlib.redirect_stdout(buffer):
|
||||
print_attribution_summary(summary)
|
||||
|
||||
output = buffer.getvalue()
|
||||
self.assertIn("Factor attribution", output)
|
||||
self.assertIn("Strategy", output)
|
||||
self.assertIn("ff5", output)
|
||||
self.assertIn("alpha_ann", output)
|
||||
self.assertIn("Interpretation", output)
|
||||
|
||||
def _make_price_frame(self, dates: pd.DatetimeIndex, benchmark: str) -> pd.DataFrame:
|
||||
steps = np.arange(len(dates), dtype=float)
|
||||
data = {}
|
||||
for symbol, base, drift, amplitude, frequency, phase in (
|
||||
("AAA", 45.0, 0.0005, 0.030, 19.0, 0.1),
|
||||
("BBB", 60.0, 0.0002, 0.025, 23.0, 0.8),
|
||||
("CCC", 75.0, -0.0001, 0.035, 17.0, 1.4),
|
||||
("DDD", 90.0, 0.0007, 0.020, 29.0, 0.5),
|
||||
("EEE", 55.0, -0.0002, 0.028, 31.0, 1.9),
|
||||
("FFF", 70.0, 0.0004, 0.032, 21.0, 2.5),
|
||||
):
|
||||
log_path = drift * steps + amplitude * np.sin(steps / frequency + phase)
|
||||
data[symbol] = base * np.exp(log_path)
|
||||
|
||||
benchmark_path = 0.0004 * steps + 0.018 * np.sin(steps / 27.0 + 0.3)
|
||||
data[benchmark] = 250.0 * np.exp(benchmark_path)
|
||||
return pd.DataFrame(data, index=dates)
|
||||
|
||||
Reference in New Issue
Block a user