Add local attribution factor builders
This commit is contained in:
@@ -10,6 +10,7 @@ from pathlib import Path
|
|||||||
from urllib.error import URLError
|
from urllib.error import URLError
|
||||||
from urllib.request import Request, urlopen
|
from urllib.request import Request, urlopen
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
KEN_FRENCH_DAILY_FF5_ZIP_URL = (
|
KEN_FRENCH_DAILY_FF5_ZIP_URL = (
|
||||||
@@ -140,3 +141,84 @@ def load_external_us_factors(cache_dir: Path | str = "data/factors") -> pd.DataF
|
|||||||
|
|
||||||
factors.to_csv(cache_path)
|
factors.to_csv(cache_path)
|
||||||
return factors
|
return factors
|
||||||
|
|
||||||
|
|
||||||
|
def _select_stock_prices(price_data: pd.DataFrame, benchmark: str) -> pd.DataFrame:
|
||||||
|
stocks = price_data.drop(columns=[benchmark], errors="ignore")
|
||||||
|
return stocks.sort_index().astype(float)
|
||||||
|
|
||||||
|
|
||||||
|
def _long_short_factor(
|
||||||
|
scores: pd.DataFrame,
|
||||||
|
returns: pd.DataFrame,
|
||||||
|
quantile: float = 0.3,
|
||||||
|
) -> pd.Series:
|
||||||
|
lagged_scores = scores.shift(1)
|
||||||
|
high_cutoff = lagged_scores.quantile(1 - quantile, axis=1)
|
||||||
|
low_cutoff = lagged_scores.quantile(quantile, axis=1)
|
||||||
|
|
||||||
|
long_mask = lagged_scores.ge(high_cutoff, axis=0)
|
||||||
|
short_mask = lagged_scores.le(low_cutoff, axis=0)
|
||||||
|
long_returns = returns.where(long_mask).mean(axis=1)
|
||||||
|
short_returns = returns.where(short_mask).mean(axis=1)
|
||||||
|
return (long_returns - short_returns).rename(None)
|
||||||
|
|
||||||
|
|
||||||
|
def build_extension_factors(
|
||||||
|
price_data: pd.DataFrame,
|
||||||
|
benchmark: str,
|
||||||
|
market: str,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
del market
|
||||||
|
|
||||||
|
stocks = _select_stock_prices(price_data, benchmark)
|
||||||
|
returns = stocks.pct_change()
|
||||||
|
|
||||||
|
momentum_scores = stocks.shift(21).pct_change(231)
|
||||||
|
low_vol_scores = -returns.rolling(60, min_periods=60).std()
|
||||||
|
recovery_scores = stocks / stocks.rolling(63, min_periods=63).min() - 1.0
|
||||||
|
|
||||||
|
return pd.DataFrame(
|
||||||
|
{
|
||||||
|
"MOM": _long_short_factor(momentum_scores, returns),
|
||||||
|
"LOWVOL": _long_short_factor(low_vol_scores, returns),
|
||||||
|
"RECOVERY": _long_short_factor(recovery_scores, returns),
|
||||||
|
},
|
||||||
|
index=price_data.index,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _positive_share(values: np.ndarray) -> float:
|
||||||
|
return float(np.mean(values > 0))
|
||||||
|
|
||||||
|
|
||||||
|
def build_proxy_core_factors(
|
||||||
|
price_data: pd.DataFrame,
|
||||||
|
benchmark: str,
|
||||||
|
market: str,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
del market
|
||||||
|
|
||||||
|
stocks = _select_stock_prices(price_data, benchmark)
|
||||||
|
returns = stocks.pct_change()
|
||||||
|
|
||||||
|
if benchmark in price_data:
|
||||||
|
market_factor = price_data[benchmark].astype(float).pct_change()
|
||||||
|
else:
|
||||||
|
market_factor = returns.mean(axis=1)
|
||||||
|
|
||||||
|
inverse_price_scores = -stocks
|
||||||
|
value_proxy_scores = -(stocks / stocks.rolling(252, min_periods=252).min() - 1.0)
|
||||||
|
profitability_proxy_scores = returns.rolling(63, min_periods=63).apply(_positive_share, raw=True)
|
||||||
|
investment_proxy_scores = -stocks.pct_change(126)
|
||||||
|
|
||||||
|
return pd.DataFrame(
|
||||||
|
{
|
||||||
|
"MKT": market_factor,
|
||||||
|
"SMB_PROXY": _long_short_factor(inverse_price_scores, returns),
|
||||||
|
"HML_PROXY": _long_short_factor(value_proxy_scores, returns),
|
||||||
|
"RMW_PROXY": _long_short_factor(profitability_proxy_scores, returns),
|
||||||
|
"CMA_PROXY": _long_short_factor(investment_proxy_scores, returns),
|
||||||
|
},
|
||||||
|
index=price_data.index,
|
||||||
|
)
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from pathlib import Path
|
|||||||
from urllib.error import URLError
|
from urllib.error import URLError
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
from factor_attribution import (
|
from factor_attribution import (
|
||||||
@@ -17,6 +18,8 @@ from factor_attribution import (
|
|||||||
KEN_FRENCH_DAILY_FF5_ZIP_URL,
|
KEN_FRENCH_DAILY_FF5_ZIP_URL,
|
||||||
_download_kf_zip_bytes,
|
_download_kf_zip_bytes,
|
||||||
_parse_kf_daily_csv,
|
_parse_kf_daily_csv,
|
||||||
|
build_extension_factors,
|
||||||
|
build_proxy_core_factors,
|
||||||
load_external_us_factors,
|
load_external_us_factors,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -289,3 +292,46 @@ class ExternalFactorLoaderTests(unittest.TestCase):
|
|||||||
with zipfile.ZipFile(buffer, mode="w") as archive:
|
with zipfile.ZipFile(buffer, mode="w") as archive:
|
||||||
archive.writestr(filename, contents)
|
archive.writestr(filename, contents)
|
||||||
return buffer.getvalue()
|
return buffer.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
class LocalFactorConstructionTests(unittest.TestCase):
|
||||||
|
def test_build_extension_factors_returns_expected_columns_with_non_null_values_after_warmup(self):
|
||||||
|
prices = self._make_price_frame(benchmark="SPY")
|
||||||
|
|
||||||
|
factors = build_extension_factors(prices, benchmark="SPY", market="us")
|
||||||
|
|
||||||
|
self.assertListEqual(list(factors.columns), ["MOM", "LOWVOL", "RECOVERY"])
|
||||||
|
self.assertTrue(factors.iloc[260:].notna().all().all())
|
||||||
|
self.assertGreater(factors.iloc[260:].abs().sum().sum(), 0.0)
|
||||||
|
|
||||||
|
def test_build_proxy_core_factors_returns_expected_columns_with_non_null_values_after_warmup(self):
|
||||||
|
prices = self._make_price_frame(benchmark="000300.SS")
|
||||||
|
|
||||||
|
factors = build_proxy_core_factors(prices, benchmark="000300.SS", market="cn")
|
||||||
|
|
||||||
|
self.assertListEqual(
|
||||||
|
list(factors.columns),
|
||||||
|
["MKT", "SMB_PROXY", "HML_PROXY", "RMW_PROXY", "CMA_PROXY"],
|
||||||
|
)
|
||||||
|
self.assertTrue(factors.iloc[260:].notna().all().all())
|
||||||
|
self.assertGreater(factors.iloc[260:].abs().sum().sum(), 0.0)
|
||||||
|
|
||||||
|
def _make_price_frame(self, benchmark: str) -> pd.DataFrame:
|
||||||
|
dates = pd.date_range("2025-01-01", periods=320, freq="B")
|
||||||
|
steps = np.arange(len(dates), dtype=float)
|
||||||
|
symbols = [
|
||||||
|
("A", 45.0, 0.0006, 0.030, 19.0, 0.1),
|
||||||
|
("B", 60.0, 0.0003, 0.025, 23.0, 0.8),
|
||||||
|
("C", 75.0, -0.0002, 0.035, 17.0, 1.4),
|
||||||
|
("D", 90.0, 0.0008, 0.020, 29.0, 0.5),
|
||||||
|
("E", 55.0, -0.0001, 0.028, 31.0, 1.9),
|
||||||
|
("F", 70.0, 0.0005, 0.032, 21.0, 2.5),
|
||||||
|
]
|
||||||
|
data = {}
|
||||||
|
for symbol, base, drift, amplitude, frequency, phase in symbols:
|
||||||
|
log_path = drift * steps + amplitude * np.sin(steps / frequency + phase)
|
||||||
|
data[symbol] = base * np.exp(log_path)
|
||||||
|
|
||||||
|
benchmark_path = 0.0004 * steps + 0.018 * np.sin(steps / 27.0 + 0.3)
|
||||||
|
data[benchmark] = 250.0 * np.exp(benchmark_path)
|
||||||
|
return pd.DataFrame(data, index=dates)
|
||||||
|
|||||||
Reference in New Issue
Block a user