424 lines
17 KiB
Python
424 lines
17 KiB
Python
import http.client
|
|
import io
|
|
import socket
|
|
import ssl
|
|
import tempfile
|
|
import unittest
|
|
import zipfile
|
|
from pathlib import Path
|
|
from urllib.error import URLError
|
|
from unittest import mock
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from factor_attribution import (
|
|
ExternalFactorDownloadError,
|
|
ExternalFactorFormatError,
|
|
KEN_FRENCH_DAILY_FF5_ZIP_URL,
|
|
_download_kf_zip_bytes,
|
|
_parse_kf_daily_csv,
|
|
build_extension_factors,
|
|
build_proxy_core_factors,
|
|
load_external_us_factors,
|
|
)
|
|
|
|
|
|
class ExternalFactorLoaderTests(unittest.TestCase):
|
|
def test_download_kf_zip_bytes_fetches_official_ken_french_zip(self):
|
|
response = mock.MagicMock()
|
|
response.read.return_value = b"zip-bytes"
|
|
response.__enter__.return_value = response
|
|
response.__exit__.return_value = False
|
|
|
|
with mock.patch("factor_attribution.urlopen", return_value=response) as mocked_urlopen:
|
|
raw_bytes = _download_kf_zip_bytes()
|
|
|
|
self.assertEqual(raw_bytes, b"zip-bytes")
|
|
request = mocked_urlopen.call_args.args[0]
|
|
self.assertEqual(request.full_url, KEN_FRENCH_DAILY_FF5_ZIP_URL)
|
|
self.assertEqual(mocked_urlopen.call_args.kwargs["timeout"], 30)
|
|
|
|
def test_download_kf_zip_bytes_wraps_transport_errors(self):
|
|
for error in (
|
|
URLError("boom"),
|
|
TimeoutError("timed out"),
|
|
ConnectionError("conn reset"),
|
|
socket.timeout("socket timed out"),
|
|
socket.gaierror("dns failed"),
|
|
ssl.SSLError("tls failed"),
|
|
):
|
|
with self.subTest(error_type=type(error).__name__):
|
|
with mock.patch("factor_attribution.urlopen", side_effect=error):
|
|
with self.assertRaises(ExternalFactorDownloadError):
|
|
_download_kf_zip_bytes()
|
|
|
|
def test_download_kf_zip_bytes_wraps_incomplete_read_errors(self):
|
|
response = mock.MagicMock()
|
|
response.read.side_effect = http.client.IncompleteRead(b"partial", 10)
|
|
response.__enter__.return_value = response
|
|
response.__exit__.return_value = False
|
|
|
|
with mock.patch("factor_attribution.urlopen", return_value=response):
|
|
with self.assertRaises(ExternalFactorDownloadError):
|
|
_download_kf_zip_bytes()
|
|
|
|
def test_load_external_us_factors_parses_percent_values_and_dates_from_zip_payload(self):
|
|
csv_text = (
|
|
"This line is ignored\n"
|
|
",Mkt-RF,SMB,HML,RMW,CMA,RF\n"
|
|
"20260102,1.00,0.50,-0.25,0.10,-0.05,0.02\n"
|
|
"20260105,-0.20,0.10,0.30,-0.15,0.05,0.02\n"
|
|
"\n"
|
|
)
|
|
zip_bytes = self._make_zip_bytes(
|
|
"F-F_Research_Data_5_Factors_2x3_daily.csv",
|
|
csv_text,
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
with mock.patch(
|
|
"factor_attribution._download_kf_zip_bytes",
|
|
return_value=zip_bytes,
|
|
):
|
|
factors = load_external_us_factors(cache_dir=Path(tmpdir))
|
|
|
|
self.assertListEqual(
|
|
list(factors.columns),
|
|
["MKT_RF", "SMB", "HML", "RMW", "CMA", "RF"],
|
|
)
|
|
self.assertAlmostEqual(factors.iloc[0]["MKT_RF"], 0.01)
|
|
self.assertAlmostEqual(factors.iloc[0]["RF"], 0.0002)
|
|
self.assertEqual(str(factors.index[0].date()), "2026-01-02")
|
|
|
|
def test_load_external_us_factors_falls_back_to_cache_when_download_fails(self):
|
|
cached = pd.DataFrame(
|
|
{
|
|
"MKT_RF": [0.01],
|
|
"SMB": [0.0],
|
|
"HML": [0.0],
|
|
"RMW": [0.0],
|
|
"CMA": [0.0],
|
|
"RF": [0.0001],
|
|
},
|
|
index=pd.to_datetime(["2026-01-02"]),
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cache_dir = Path(tmpdir)
|
|
cached.to_csv(cache_dir / "ff5_us_daily.csv")
|
|
with mock.patch("factor_attribution.urlopen", side_effect=socket.gaierror("dns failed")):
|
|
with self.assertWarnsRegex(UserWarning, "cached data"):
|
|
factors = load_external_us_factors(cache_dir=cache_dir)
|
|
|
|
self.assertEqual(len(factors), 1)
|
|
self.assertAlmostEqual(factors.iloc[0]["MKT_RF"], 0.01)
|
|
|
|
def test_load_external_us_factors_falls_back_to_cache_when_download_read_is_incomplete(self):
|
|
cached = pd.DataFrame(
|
|
{
|
|
"MKT_RF": [0.01],
|
|
"SMB": [0.0],
|
|
"HML": [0.0],
|
|
"RMW": [0.0],
|
|
"CMA": [0.0],
|
|
"RF": [0.0001],
|
|
},
|
|
index=pd.to_datetime(["2026-01-02"]),
|
|
)
|
|
response = mock.MagicMock()
|
|
response.read.side_effect = http.client.IncompleteRead(b"partial", 10)
|
|
response.__enter__.return_value = response
|
|
response.__exit__.return_value = False
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cache_dir = Path(tmpdir)
|
|
cached.to_csv(cache_dir / "ff5_us_daily.csv")
|
|
with mock.patch("factor_attribution.urlopen", return_value=response):
|
|
with self.assertWarnsRegex(UserWarning, "cached data"):
|
|
factors = load_external_us_factors(cache_dir=cache_dir)
|
|
|
|
self.assertEqual(len(factors), 1)
|
|
self.assertAlmostEqual(factors.iloc[0]["MKT_RF"], 0.01)
|
|
|
|
def test_load_external_us_factors_falls_back_to_cache_when_http_status_line_is_bad(self):
|
|
cached = pd.DataFrame(
|
|
{
|
|
"MKT_RF": [0.01],
|
|
"SMB": [0.0],
|
|
"HML": [0.0],
|
|
"RMW": [0.0],
|
|
"CMA": [0.0],
|
|
"RF": [0.0001],
|
|
},
|
|
index=pd.to_datetime(["2026-01-02"]),
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cache_dir = Path(tmpdir)
|
|
cached.to_csv(cache_dir / "ff5_us_daily.csv")
|
|
with mock.patch("factor_attribution.urlopen", side_effect=http.client.BadStatusLine("HTTP/1.1 ???")):
|
|
with self.assertWarnsRegex(UserWarning, "cached data"):
|
|
factors = load_external_us_factors(cache_dir=cache_dir)
|
|
|
|
self.assertEqual(len(factors), 1)
|
|
self.assertAlmostEqual(factors.iloc[0]["MKT_RF"], 0.01)
|
|
|
|
def test_parse_kf_daily_csv_raises_external_factor_format_error_for_missing_header(self):
|
|
zip_bytes = self._make_zip_bytes(
|
|
"F-F_Research_Data_5_Factors_2x3_daily.csv",
|
|
"not the expected file format\n20260102,1.00\n",
|
|
)
|
|
|
|
with self.assertRaises(ExternalFactorFormatError):
|
|
_parse_kf_daily_csv(zip_bytes)
|
|
|
|
def test_load_external_us_factors_warns_and_falls_back_to_cache_when_source_format_is_invalid(self):
|
|
cached = pd.DataFrame(
|
|
{
|
|
"MKT_RF": [0.01],
|
|
"SMB": [0.0],
|
|
"HML": [0.0],
|
|
"RMW": [0.0],
|
|
"CMA": [0.0],
|
|
"RF": [0.0001],
|
|
},
|
|
index=pd.to_datetime(["2026-01-02"]),
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cache_dir = Path(tmpdir)
|
|
cached.to_csv(cache_dir / "ff5_us_daily.csv")
|
|
malformed_zip_bytes = self._make_zip_bytes(
|
|
"F-F_Research_Data_5_Factors_2x3_daily.csv",
|
|
"not the expected file format\n20260102,1.00\n",
|
|
)
|
|
with mock.patch(
|
|
"factor_attribution._download_kf_zip_bytes",
|
|
return_value=malformed_zip_bytes,
|
|
):
|
|
with self.assertWarnsRegex(UserWarning, "cached data"):
|
|
factors = load_external_us_factors(cache_dir=cache_dir)
|
|
|
|
self.assertEqual(len(factors), 1)
|
|
self.assertAlmostEqual(factors.iloc[0]["MKT_RF"], 0.01)
|
|
|
|
def test_load_external_us_factors_warns_and_falls_back_to_cache_when_zip_is_invalid(self):
|
|
cached = pd.DataFrame(
|
|
{
|
|
"MKT_RF": [0.01],
|
|
"SMB": [0.0],
|
|
"HML": [0.0],
|
|
"RMW": [0.0],
|
|
"CMA": [0.0],
|
|
"RF": [0.0001],
|
|
},
|
|
index=pd.to_datetime(["2026-01-02"]),
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cache_dir = Path(tmpdir)
|
|
cached.to_csv(cache_dir / "ff5_us_daily.csv")
|
|
with mock.patch(
|
|
"factor_attribution._download_kf_zip_bytes",
|
|
return_value=b"not-a-zip-file",
|
|
):
|
|
with self.assertWarnsRegex(UserWarning, "cached data"):
|
|
factors = load_external_us_factors(cache_dir=cache_dir)
|
|
|
|
self.assertEqual(len(factors), 1)
|
|
self.assertAlmostEqual(factors.iloc[0]["MKT_RF"], 0.01)
|
|
|
|
def test_load_external_us_factors_surfaces_cache_write_failures(self):
|
|
csv_text = (
|
|
"This line is ignored\n"
|
|
",Mkt-RF,SMB,HML,RMW,CMA,RF\n"
|
|
"20260102,1.00,0.50,-0.25,0.10,-0.05,0.02\n"
|
|
"\n"
|
|
)
|
|
zip_bytes = self._make_zip_bytes(
|
|
"F-F_Research_Data_5_Factors_2x3_daily.csv",
|
|
csv_text,
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
with mock.patch(
|
|
"factor_attribution._download_kf_zip_bytes",
|
|
return_value=zip_bytes,
|
|
):
|
|
with mock.patch("pandas.DataFrame.to_csv", side_effect=OSError("disk full")):
|
|
with self.assertRaises(OSError):
|
|
load_external_us_factors(cache_dir=Path(tmpdir))
|
|
|
|
def test_load_external_us_factors_does_not_swallow_unrelated_local_failures(self):
|
|
csv_text = (
|
|
"This line is ignored\n"
|
|
",Mkt-RF,SMB,HML,RMW,CMA,RF\n"
|
|
"20260102,1.00,0.50,-0.25,0.10,-0.05,0.02\n"
|
|
"\n"
|
|
)
|
|
zip_bytes = self._make_zip_bytes(
|
|
"F-F_Research_Data_5_Factors_2x3_daily.csv",
|
|
csv_text,
|
|
)
|
|
cached = pd.DataFrame(
|
|
{
|
|
"MKT_RF": [0.01],
|
|
"SMB": [0.0],
|
|
"HML": [0.0],
|
|
"RMW": [0.0],
|
|
"CMA": [0.0],
|
|
"RF": [0.0001],
|
|
},
|
|
index=pd.to_datetime(["2026-01-02"]),
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cache_dir = Path(tmpdir)
|
|
cached.to_csv(cache_dir / "ff5_us_daily.csv")
|
|
with mock.patch(
|
|
"factor_attribution._download_kf_zip_bytes",
|
|
return_value=zip_bytes,
|
|
):
|
|
with mock.patch(
|
|
"factor_attribution._parse_kf_daily_csv",
|
|
side_effect=RuntimeError("unexpected local bug"),
|
|
):
|
|
with self.assertRaises(RuntimeError):
|
|
load_external_us_factors(cache_dir=cache_dir)
|
|
|
|
def _make_zip_bytes(self, filename: str, contents: str) -> bytes:
|
|
buffer = io.BytesIO()
|
|
with zipfile.ZipFile(buffer, mode="w") as archive:
|
|
archive.writestr(filename, contents)
|
|
return buffer.getvalue()
|
|
|
|
|
|
class LocalFactorConstructionTests(unittest.TestCase):
|
|
def test_build_extension_factors_returns_expected_columns_with_non_null_values_after_warmup(self):
|
|
prices = self._make_price_frame(benchmark="SPY")
|
|
|
|
factors = build_extension_factors(prices, benchmark="SPY", market="us")
|
|
|
|
self.assertListEqual(list(factors.columns), ["MOM", "LOWVOL", "RECOVERY"])
|
|
self.assertTrue(factors.iloc[260:].notna().all().all())
|
|
self.assertGreater(factors.iloc[260:].abs().sum().sum(), 0.0)
|
|
|
|
def test_build_proxy_core_factors_returns_expected_columns_with_non_null_values_after_warmup(self):
|
|
prices = self._make_price_frame(benchmark="000300.SS")
|
|
|
|
factors = build_proxy_core_factors(prices, benchmark="000300.SS", market="cn")
|
|
|
|
self.assertListEqual(
|
|
list(factors.columns),
|
|
["MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"],
|
|
)
|
|
self.assertTrue(factors.iloc[260:].notna().all().all())
|
|
self.assertGreater(factors.iloc[260:].abs().sum().sum(), 0.0)
|
|
|
|
def test_build_extension_factors_does_not_use_future_prices(self):
|
|
prices = self._make_price_frame(benchmark="SPY")
|
|
mutated = prices.copy()
|
|
future_start = prices.index[280]
|
|
mutated.loc[future_start:, "A"] = mutated.loc[future_start:, "A"] * 1.8
|
|
mutated.loc[future_start:, "B"] = mutated.loc[future_start:, "B"] * 0.4
|
|
|
|
original = build_extension_factors(prices, benchmark="SPY", market="us")
|
|
changed = build_extension_factors(mutated, benchmark="SPY", market="us")
|
|
|
|
comparison_end = prices.index[279]
|
|
pd.testing.assert_frame_equal(original.loc[:comparison_end], changed.loc[:comparison_end])
|
|
self.assertGreater(
|
|
(original.loc[future_start:] - changed.loc[future_start:]).abs().sum().sum(),
|
|
0.0,
|
|
)
|
|
|
|
def test_build_proxy_core_factors_market_branch_does_not_use_future_benchmark_prices(self):
|
|
prices = self._make_price_frame(benchmark="000300.SS")
|
|
mutated = prices.copy()
|
|
future_start = prices.index[280]
|
|
mutated.loc[future_start:, "000300.SS"] = mutated.loc[future_start:, "000300.SS"] * 1.4
|
|
|
|
original = build_proxy_core_factors(prices, benchmark="000300.SS", market="cn")
|
|
changed = build_proxy_core_factors(mutated, benchmark="000300.SS", market="cn")
|
|
|
|
comparison_end = prices.index[279]
|
|
pd.testing.assert_series_equal(
|
|
original.loc[:comparison_end, "MKT"],
|
|
changed.loc[:comparison_end, "MKT"],
|
|
check_names=False,
|
|
)
|
|
proxy_columns = ["SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"]
|
|
pd.testing.assert_frame_equal(
|
|
original.loc[:, proxy_columns],
|
|
changed.loc[:, proxy_columns],
|
|
)
|
|
self.assertGreater(
|
|
(original.loc[future_start:, "MKT"] - changed.loc[future_start:, "MKT"]).abs().sum(),
|
|
0.0,
|
|
)
|
|
|
|
def test_build_proxy_core_factors_proxy_columns_do_not_use_future_stock_prices(self):
|
|
prices = self._make_price_frame(benchmark="000300.SS")
|
|
mutated = prices.copy()
|
|
future_start = prices.index[280]
|
|
mutated.loc[future_start:, "C"] = mutated.loc[future_start:, "C"] * 0.35
|
|
mutated.loc[future_start:, "D"] = mutated.loc[future_start:, "D"] * 1.6
|
|
|
|
original = build_proxy_core_factors(prices, benchmark="000300.SS", market="cn")
|
|
changed = build_proxy_core_factors(mutated, benchmark="000300.SS", market="cn")
|
|
|
|
comparison_end = prices.index[279]
|
|
proxy_columns = ["SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"]
|
|
pd.testing.assert_frame_equal(
|
|
original.loc[:comparison_end, proxy_columns],
|
|
changed.loc[:comparison_end, proxy_columns],
|
|
)
|
|
self.assertGreater(
|
|
(original.loc[future_start:, proxy_columns] - changed.loc[future_start:, proxy_columns]).abs().sum().sum(),
|
|
0.0,
|
|
)
|
|
|
|
def test_build_proxy_core_factors_falls_back_to_equal_weight_market_when_benchmark_missing(self):
|
|
prices_with_benchmark = self._make_price_frame(benchmark="CN_BENCH")
|
|
prices = prices_with_benchmark.drop(columns=["CN_BENCH"])
|
|
|
|
factors = build_proxy_core_factors(prices, benchmark="000300.SS", market="cn")
|
|
reference = build_proxy_core_factors(prices_with_benchmark, benchmark="CN_BENCH", market="cn")
|
|
|
|
expected_market = prices.pct_change().mean(axis=1)
|
|
pd.testing.assert_series_equal(factors["MKT"], expected_market, check_names=False)
|
|
self.assertListEqual(
|
|
list(factors.columns),
|
|
["MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"],
|
|
)
|
|
self.assertTrue(factors.iloc[260:][["SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"]].notna().all().all())
|
|
self.assertGreater(
|
|
factors.iloc[260:][["SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"]].abs().sum().sum(),
|
|
0.0,
|
|
)
|
|
pd.testing.assert_frame_equal(
|
|
factors[["SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"]],
|
|
reference[["SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"]],
|
|
)
|
|
|
|
def _make_price_frame(self, benchmark: str) -> pd.DataFrame:
|
|
dates = pd.date_range("2025-01-01", periods=320, freq="B")
|
|
steps = np.arange(len(dates), dtype=float)
|
|
symbols = [
|
|
("A", 45.0, 0.0006, 0.030, 19.0, 0.1),
|
|
("B", 60.0, 0.0003, 0.025, 23.0, 0.8),
|
|
("C", 75.0, -0.0002, 0.035, 17.0, 1.4),
|
|
("D", 90.0, 0.0008, 0.020, 29.0, 0.5),
|
|
("E", 55.0, -0.0001, 0.028, 31.0, 1.9),
|
|
("F", 70.0, 0.0005, 0.032, 21.0, 2.5),
|
|
]
|
|
data = {}
|
|
for symbol, base, drift, amplitude, frequency, phase in symbols:
|
|
log_path = drift * steps + amplitude * np.sin(steps / frequency + phase)
|
|
data[symbol] = base * np.exp(log_path)
|
|
|
|
benchmark_path = 0.0004 * steps + 0.018 * np.sin(steps / 27.0 + 0.3)
|
|
data[benchmark] = 250.0 * np.exp(benchmark_path)
|
|
return pd.DataFrame(data, index=dates)
|