import numpy as np import pandas as pd from research.us_alpha_report import summarize_equity_window from research.us_fundamentals import build_exploratory_fundamental_score from strategies.recovery_momentum import RecoveryMomentumStrategy TRADING_DAYS_PER_MONTH = 21 def xsec_rank(df: pd.DataFrame, ascending: bool = True) -> pd.DataFrame: return df.rank(axis=1, pct=True, na_option="keep", ascending=ascending) def apply_filter_threshold(score: pd.DataFrame, filter_rank: pd.DataFrame, min_rank: float) -> pd.DataFrame: aligned_filter = filter_rank.reindex(index=score.index, columns=score.columns) return score.where(aligned_filter >= min_rank) def weighted_rank_blend(factors: dict[str, pd.DataFrame], weights: dict[str, float]) -> pd.DataFrame: total = None total_weight = 0.0 for name, weight in weights.items(): rank = xsec_rank(factors[name]) component = rank * weight total = component if total is None else total.add(component, fill_value=0.0) total_weight += weight return total / total_weight if total_weight > 0 else total def build_price_factor_pack(close: pd.DataFrame) -> dict[str, pd.DataFrame]: monthly_ret = close.pct_change(TRADING_DAYS_PER_MONTH) rolling_max = close.rolling(252, min_periods=252).max() drawdown = close / rolling_max - 1.0 return { "recovery": close / close.rolling(63, min_periods=63).min() - 1.0, "momentum_12_1": close.shift(21).pct_change(231), "consistency": monthly_ret.gt(0).rolling(252, min_periods=252).mean(), "inv_drawdown": -drawdown.rolling(252, min_periods=252).min(), "low_vol": -close.pct_change().rolling(60, min_periods=60).std(), "dip_21": -close.pct_change(21), "value_proxy": close.rolling(250, min_periods=250).min() / close, "uptrend": (close > close.rolling(150, min_periods=150).mean()).astype(float), } def _monthly_score_weights(score: pd.DataFrame, top_n: int, rebal_freq: int = TRADING_DAYS_PER_MONTH) -> pd.DataFrame: score = score.sort_index() n_valid = score.notna().sum(axis=1) enough = n_valid >= top_n rank = score.rank(axis=1, ascending=False, na_option="bottom", method="first") top_mask = (rank <= top_n) & enough.to_numpy().reshape(-1, 1) raw = top_mask.astype(float) row_sums = raw.sum(axis=1).replace(0.0, np.nan) weights = raw.div(row_sums, axis=0).fillna(0.0) first_valid = int(np.argmax(score.notna().any(axis=1).to_numpy())) if score.notna().any().any() else 0 rebal_mask = pd.Series(False, index=score.index) rebal_mask.iloc[list(range(first_valid, len(score), rebal_freq))] = True weights[~rebal_mask] = np.nan weights = weights.ffill().fillna(0.0) weights.iloc[:first_valid] = 0.0 return weights.shift(1).fillna(0.0) def _backtest_from_weights( close: pd.DataFrame, weights: pd.DataFrame, initial_capital: float = 10_000.0, transaction_cost: float = 0.001, ) -> pd.Series: daily_returns = close.pct_change(fill_method=None).fillna(0.0) portfolio_returns = (daily_returns * weights.reindex(close.index).fillna(0.0)).sum(axis=1) turnover = weights.diff().abs().sum(axis=1).fillna(0.0) portfolio_returns -= turnover * transaction_cost return (1.0 + portfolio_returns).cumprod() * initial_capital def _equity_to_yearly_returns(equity: pd.Series) -> pd.Series: rows = {} for year in range(int(equity.index.min().year), int(equity.index.max().year) + 1): window = equity.loc[(equity.index >= pd.Timestamp(year=year, month=1, day=1)) & (equity.index <= pd.Timestamp(year=year, month=12, day=31))] if len(window.dropna()) >= 2: rows[year] = window.dropna().iloc[-1] / window.dropna().iloc[0] - 1.0 return pd.Series(rows, name=equity.name) def _cagr(equity: pd.Series) -> float: clean = equity.dropna() years = (clean.index[-1] - clean.index[0]).days / 365.25 if years <= 0: return np.nan return (clean.iloc[-1] / clean.iloc[0]) ** (1 / years) - 1 def _max_dd(equity: pd.Series) -> float: clean = equity.dropna() return (clean / clean.cummax() - 1.0).min() def _candidate_scores(price_factors: dict[str, pd.DataFrame], fundamental_score: pd.DataFrame) -> dict[str, pd.DataFrame]: factors = {**price_factors, "fundamental": fundamental_score} base_rm = weighted_rank_blend(factors, {"recovery": 0.5, "momentum_12_1": 0.5}) candidates = { "rm_fund_filter_50": apply_filter_threshold(base_rm, xsec_rank(fundamental_score), min_rank=0.50), "rm_fund_filter_70": apply_filter_threshold(base_rm, xsec_rank(fundamental_score), min_rank=0.70), "rm_fund_tilt_20": weighted_rank_blend(factors, {"recovery": 0.4, "momentum_12_1": 0.4, "fundamental": 0.2}), "rm_fund_tilt_35": weighted_rank_blend(factors, {"recovery": 0.325, "momentum_12_1": 0.325, "fundamental": 0.35}), "rm_quality_fund": weighted_rank_blend( factors, {"recovery": 0.35, "momentum_12_1": 0.35, "consistency": 0.10, "inv_drawdown": 0.10, "fundamental": 0.10}, ), "rm_quality_lowvol_fund": weighted_rank_blend( factors, {"recovery": 0.30, "momentum_12_1": 0.25, "consistency": 0.10, "inv_drawdown": 0.10, "low_vol": 0.10, "fundamental": 0.15}, ), "mega_quality_fund": weighted_rank_blend( factors, { "recovery": 0.20, "momentum_12_1": 0.20, "consistency": 0.15, "inv_drawdown": 0.15, "low_vol": 0.10, "dip_21": 0.05, "value_proxy": 0.05, "fundamental": 0.10, }, ), "mega_filter_fund_50": apply_filter_threshold( weighted_rank_blend( factors, { "recovery": 0.25, "momentum_12_1": 0.20, "consistency": 0.10, "inv_drawdown": 0.10, "low_vol": 0.10, "value_proxy": 0.10, "fundamental": 0.15, }, ), xsec_rank(fundamental_score), min_rank=0.50, ), "trend_rm_fund": apply_filter_threshold( weighted_rank_blend(factors, {"recovery": 0.35, "momentum_12_1": 0.35, "fundamental": 0.15, "low_vol": 0.15}), price_factors["uptrend"], min_rank=0.50, ), } return candidates def run_combo_backtests( close: pd.DataFrame, fundamental_score: pd.DataFrame, top_n: int = 10, transaction_cost: float = 0.001, ) -> tuple[pd.DataFrame, pd.DataFrame]: benchmark_col = "SPY" if "SPY" in close.columns else None stock_close = close.drop(columns=[benchmark_col], errors="ignore").dropna(axis=1, how="all") fund = fundamental_score.reindex(index=stock_close.index, columns=stock_close.columns) price_factors = build_price_factor_pack(stock_close) equities: dict[str, pd.Series] = {} baseline = RecoveryMomentumStrategy(top_n=top_n) baseline_weights = baseline.generate_signals(stock_close) equities["Recovery+Mom Top10"] = _backtest_from_weights(stock_close, baseline_weights, transaction_cost=transaction_cost) for name, score in _candidate_scores(price_factors, fund).items(): weights = _monthly_score_weights(score.reindex(index=stock_close.index, columns=stock_close.columns), top_n=top_n) equities[name] = _backtest_from_weights(stock_close, weights, transaction_cost=transaction_cost) if benchmark_col is not None: spy = close[benchmark_col].dropna() equities["SPY"] = (spy / spy.iloc[0]) * 10_000.0 yearly = pd.DataFrame({name: _equity_to_yearly_returns(eq) for name, eq in equities.items()}).sort_index() baseline_yearly = yearly["Recovery+Mom Top10"] summary_rows = [] for name, equity in equities.items(): row = { "strategy": name, "CAGR": _cagr(equity), "MaxDD": _max_dd(equity), "TotalRet": equity.dropna().iloc[-1] / equity.dropna().iloc[0] - 1.0, "AvgAnnual": yearly[name].mean(), "MedianAnnual": yearly[name].median(), "YearsBeatRecovery": int(yearly[name].gt(baseline_yearly).sum()) if name != "Recovery+Mom Top10" else np.nan, } row.update({f"Win{window}Y": summarize_equity_window(equity / equity.dropna().iloc[0], name, window)["CAGR"] for window in (1, 3, 5, 10)}) summary_rows.append(row) summary = pd.DataFrame(summary_rows).sort_values("AvgAnnual", ascending=False).reset_index(drop=True) return yearly, summary def load_default_inputs(data_dir: str = "data") -> tuple[pd.DataFrame, pd.DataFrame]: close = pd.read_csv(f"{data_dir}/us.csv", index_col=0, parse_dates=True).sort_index() stock_close = close.drop(columns=["SPY"], errors="ignore") fundamental_score = build_exploratory_fundamental_score(stock_close, data_dir=data_dir) return close, fundamental_score def main() -> None: close, fundamental_score = load_default_inputs() yearly, summary = run_combo_backtests(close, fundamental_score, top_n=10) yearly.to_csv("data/us_factor_combo_yearly.csv") summary.to_csv("data/us_factor_combo_summary.csv", index=False) print("=== Yearly Returns ===") print((yearly * 100.0).round(2).to_string()) print("\n=== Summary ===") display_cols = ["strategy", "AvgAnnual", "MedianAnnual", "CAGR", "MaxDD", "YearsBeatRecovery", "Win1Y", "Win3Y", "Win5Y", "Win10Y"] print((summary[display_cols].assign( AvgAnnual=lambda df: df["AvgAnnual"] * 100.0, MedianAnnual=lambda df: df["MedianAnnual"] * 100.0, CAGR=lambda df: df["CAGR"] * 100.0, MaxDD=lambda df: df["MaxDD"] * 100.0, Win1Y=lambda df: df["Win1Y"] * 100.0, Win3Y=lambda df: df["Win3Y"] * 100.0, Win5Y=lambda df: df["Win5Y"] * 100.0, Win10Y=lambda df: df["Win10Y"] * 100.0, ).round(2)).to_string(index=False)) if __name__ == "__main__": main()