Add 12 strategy modules including adaptive blend, composite alpha, cross-asset momentum, ensemble alpha, trend rider v5/v6, and more.
763 lines
29 KiB
Python
763 lines
29 KiB
Python
"""Permanent Portfolio family — ported from usmart-quant TAA strategies.
|
||
|
||
Three strategies, all operating on a small ETF universe (SPY, TQQQ, UPRO,
|
||
GLD, DBC, TLT, SHY). Each `generate_signals(data)` returns a weights
|
||
DataFrame already 1-day lagged (PIT-safe), columns must be a subset of
|
||
``data.columns``.
|
||
|
||
* :class:`PermanentOverlay` — Browne's 25/25/25/25 with Faber MA200
|
||
overlay on the stock slot. Bullish → TQQQ; bearish → cash. Source:
|
||
``usmart-quant/strategies/taa_permanent_overlay.py``.
|
||
* :class:`TrendRiderV3` — risk-on/risk-off basket with momentum-ranked
|
||
pick, MA200 + vol/dd/peak gates, regime-min-hold + confirm + cooloff.
|
||
Source: ``usmart-quant/strategies/taa_trend_rider_v3.py``.
|
||
* :class:`PermanentV4` — improved Permanent. Stock slot picks the
|
||
momentum leader from (TQQQ, UPRO); bond slot rotates to SHY when TLT
|
||
is below its own MA200 (avoids 2022-style bond crashes); inflation
|
||
slot picks from (GLD, DBC). All four slots stay 25% — the same
|
||
diversification floor, but each slot self-rotates to its strongest
|
||
member.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
from strategies.base import Strategy
|
||
|
||
|
||
# Universe of ETFs the strategies trade. The runner ensures these are
|
||
# present as columns in the price DataFrame.
|
||
ETF_UNIVERSE = ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "TLT", "SHY"]
|
||
|
||
TREND_RIDER_V4_UNIVERSE = [
|
||
"SPY", "QQQ",
|
||
"SSO", "QLD", "UPRO", "TQQQ",
|
||
"SHY", "IEF", "TLT",
|
||
"GLD", "DBC",
|
||
]
|
||
|
||
# Global expansion: USD-listed leveraged ETFs giving HK/China exposure.
|
||
# YINN — 3x FTSE China 50 (mostly HK-listed: Tencent, Meituan, Alibaba HK ADR)
|
||
# CHAU — 3x CSI 300 A-shares (mainland blue-chips traded SH/SZ)
|
||
# Both trade in USD so they compose cleanly with TQQQ/UPRO. Full Yahoo
|
||
# history: YINN since 2010, CHAU since 2015-04.
|
||
GLOBAL_ETF_UNIVERSE = ETF_UNIVERSE + ["YINN", "CHAU"]
|
||
|
||
# HK-listed leveraged ETFs. Pure HK exposure (no proxy through ADRs):
|
||
# 7200.HK — HSI 2x (since 2017-03)
|
||
# 7500.HK — HSTECH 2x (since 2019-05)
|
||
# Note these trade in HKD; risk-off basket stays USD (GLD, DBC). Because
|
||
# HKD is pegged to USD (7.75–7.85), the FX drift over the test period is
|
||
# < 1% — acceptable as quasi-USD for this evaluation.
|
||
HK_ETF_UNIVERSE = ETF_UNIVERSE + ["7200.HK", "7500.HK"]
|
||
|
||
|
||
def _empty_weights(data: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
|
||
return pd.DataFrame(0.0, index=data.index, columns=cols)
|
||
|
||
|
||
class PermanentOverlay(Strategy):
|
||
"""Permanent Portfolio with Faber MA200 overlay on stock slot.
|
||
|
||
25% stock + 25% bonds + 25% gold + 25% cash. Stock slot holds TQQQ
|
||
when SPY > MA200 (PIT-lagged), else SHY (cash). Monthly rebalance.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
ma_window: int = 200,
|
||
rebal_every: int = 21,
|
||
signal: str = "SPY",
|
||
stock_on: str = "TQQQ",
|
||
stock_off: str = "SHY",
|
||
bonds: str = "TLT",
|
||
gold: str = "GLD",
|
||
cash: str = "SHY",
|
||
) -> None:
|
||
self.ma_window = ma_window
|
||
self.rebal_every = rebal_every
|
||
self.signal = signal
|
||
self.stock_on = stock_on
|
||
self.stock_off = stock_off
|
||
self.bonds = bonds
|
||
self.gold = gold
|
||
self.cash = cash
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
cols = list(set([self.signal, self.stock_on, self.stock_off,
|
||
self.bonds, self.gold, self.cash]))
|
||
cols = [c for c in cols if c in data.columns]
|
||
w = pd.DataFrame(np.nan, index=data.index, columns=cols)
|
||
|
||
spy = data[self.signal]
|
||
ma = spy.rolling(self.ma_window).mean()
|
||
bull = (spy > ma)
|
||
|
||
for i, dt in enumerate(data.index):
|
||
if i < self.ma_window:
|
||
continue
|
||
if (i - self.ma_window) % self.rebal_every != 0:
|
||
continue
|
||
row = {c: 0.0 for c in cols}
|
||
if bull.iloc[i]:
|
||
row[self.stock_on] = row.get(self.stock_on, 0.0) + 0.25
|
||
row[self.bonds] = row.get(self.bonds, 0.0) + 0.25
|
||
row[self.gold] = row.get(self.gold, 0.0) + 0.25
|
||
row[self.cash] = row.get(self.cash, 0.0) + 0.25
|
||
else:
|
||
# Stock slot collapses into cash → effective 50% cash
|
||
row[self.bonds] = row.get(self.bonds, 0.0) + 0.25
|
||
row[self.gold] = row.get(self.gold, 0.0) + 0.25
|
||
row[self.cash] = row.get(self.cash, 0.0) + 0.50
|
||
for s, ww in row.items():
|
||
if s in w.columns:
|
||
w.at[dt, s] = ww
|
||
|
||
# Forward-fill across non-rebal days (NaNs); fill warmup with 0.
|
||
w = w.ffill().fillna(0.0)
|
||
return w.shift(1).fillna(0.0)
|
||
|
||
|
||
class PermanentV4(Strategy):
|
||
"""Improved Permanent — Faber filters on stock + bond + commodity basket.
|
||
|
||
Slots (25% each):
|
||
stock: SPY > MA200 → max-momentum of (TQQQ, UPRO); else SHY
|
||
bond: TLT > MA200(TLT) → TLT; else SHY
|
||
gold: max-momentum of (GLD, DBC) over 63 days
|
||
cash: SHY (fixed)
|
||
|
||
Three targeted upgrades over PermanentOverlay (which only filters
|
||
the stock slot):
|
||
|
||
1. Bond slot Faber filter solves 2022 (TLT −29% kills static
|
||
Permanent's bond sleeve). Vanilla PermanentOverlay was −20.7%
|
||
in 2022; adding the bond filter alone halves that.
|
||
2. Stock slot picks momentum leader of (TQQQ, UPRO) — UPRO
|
||
substitutes when S&P leads QQQ (e.g. 2022 tech-led pullback).
|
||
3. Inflation slot rotates between GLD and DBC. GLD captures
|
||
deflation/stagflation (2020); DBC captures commodity-driven
|
||
inflation (2022). Picking the leader avoids GLD's 2022 flat
|
||
year while still owning gold when it leads.
|
||
|
||
Rebalance every 21 days. PIT-safe via terminal .shift(1).
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
ma_window: int = 200,
|
||
mom_lookback: int = 63,
|
||
rebal_every: int = 21,
|
||
regime_signal: str = "SPY",
|
||
stock_basket: tuple[str, ...] = ("TQQQ", "UPRO"),
|
||
gold_basket: tuple[str, ...] = ("GLD", "DBC"),
|
||
bond: str = "TLT",
|
||
cash: str = "SHY",
|
||
) -> None:
|
||
self.ma_window = ma_window
|
||
self.mom_lookback = mom_lookback
|
||
self.rebal_every = rebal_every
|
||
self.regime_signal = regime_signal
|
||
self.stock_basket = stock_basket
|
||
self.gold_basket = gold_basket
|
||
self.bond = bond
|
||
self.cash = cash
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
cols = list({self.regime_signal, *self.stock_basket, *self.gold_basket,
|
||
self.bond, self.cash})
|
||
cols = [c for c in cols if c in data.columns]
|
||
w = pd.DataFrame(np.nan, index=data.index, columns=cols)
|
||
|
||
spy = data[self.regime_signal]
|
||
spy_bull = spy > spy.rolling(self.ma_window).mean()
|
||
tlt_bull = data[self.bond] > data[self.bond].rolling(self.ma_window).mean()
|
||
mom = data.pct_change(self.mom_lookback)
|
||
|
||
warmup = max(self.ma_window, self.mom_lookback)
|
||
for i, dt in enumerate(data.index):
|
||
if i < warmup:
|
||
continue
|
||
if (i - warmup) % self.rebal_every != 0:
|
||
continue
|
||
slots: dict[str, float] = {c: 0.0 for c in cols}
|
||
|
||
# Stock slot
|
||
if spy_bull.iloc[i]:
|
||
pick, best = None, -np.inf
|
||
for s in self.stock_basket:
|
||
r = mom.at[dt, s] if s in mom.columns else np.nan
|
||
if pd.notna(r) and r > best:
|
||
best, pick = r, s
|
||
if pick is None:
|
||
pick = self.cash
|
||
else:
|
||
pick = self.cash
|
||
slots[pick] += 0.25
|
||
|
||
# Bond slot
|
||
slots[self.bond if tlt_bull.iloc[i] else self.cash] += 0.25
|
||
|
||
# Gold/commodity slot — basket leader by momentum (no MA filter:
|
||
# commodities are valuable diversifier even when not trending up)
|
||
pick, best = None, -np.inf
|
||
for s in self.gold_basket:
|
||
r = mom.at[dt, s] if s in mom.columns else np.nan
|
||
if pd.notna(r) and r > best:
|
||
best, pick = r, s
|
||
if pick is None:
|
||
pick = self.cash
|
||
slots[pick] += 0.25
|
||
|
||
slots[self.cash] += 0.25
|
||
|
||
for s, ww in slots.items():
|
||
if s in w.columns:
|
||
w.at[dt, s] = ww
|
||
|
||
w = w.ffill().fillna(0.0)
|
||
return w.shift(1).fillna(0.0)
|
||
|
||
|
||
class TrendRiderV3(Strategy):
|
||
"""Risk-on / risk-off basket with momentum-ranked pick + regime gates.
|
||
|
||
Faithful port of ``taa_trend_rider_v3.py`` with vol/MA/dd/peak
|
||
hysteresis, min-hold, confirm-days, entry stop-loss, and cooloff.
|
||
|
||
Output is a single 100% allocation to whichever basket member is the
|
||
momentum leader at the current regime. PIT-safe (1-day signal lag).
|
||
"""
|
||
|
||
DEFAULT_RISK_ON = ("TQQQ", "UPRO")
|
||
DEFAULT_RISK_OFF = ("GLD", "DBC")
|
||
|
||
def __init__(
|
||
self,
|
||
signal: str = "SPY",
|
||
risk_on: tuple[str, ...] = DEFAULT_RISK_ON,
|
||
risk_off: tuple[str, ...] = DEFAULT_RISK_OFF,
|
||
ma_long: int = 200,
|
||
ma_short: int = 50,
|
||
vol_window: int = 20,
|
||
vol_enter: float = 0.14,
|
||
vol_exit: float = 0.20,
|
||
dd_window: int = 40,
|
||
dd_stop: float = 0.05,
|
||
peak_window: int = 20,
|
||
peak_enter: float = 0.02,
|
||
peak_exit: float = 0.05,
|
||
regime_min_hold: int = 15,
|
||
instrument_min_hold: int = 30,
|
||
confirm_days: int = 3,
|
||
stop_loss_pct: float = 0.15,
|
||
cooloff_days: int = 20,
|
||
mom_lookback: int = 63,
|
||
) -> None:
|
||
self.signal = signal
|
||
self.risk_on = risk_on
|
||
self.risk_off = risk_off
|
||
self.ma_long = ma_long
|
||
self.ma_short = ma_short
|
||
self.vol_window = vol_window
|
||
self.vol_enter = vol_enter
|
||
self.vol_exit = vol_exit
|
||
self.dd_window = dd_window
|
||
self.dd_stop = dd_stop
|
||
self.peak_window = peak_window
|
||
self.peak_enter = peak_enter
|
||
self.peak_exit = peak_exit
|
||
self.regime_min_hold = regime_min_hold
|
||
self.instrument_min_hold = instrument_min_hold
|
||
self.confirm_days = confirm_days
|
||
self.stop_loss_pct = stop_loss_pct
|
||
self.cooloff_days = cooloff_days
|
||
self.mom_lookback = mom_lookback
|
||
|
||
@staticmethod
|
||
def _above_ma(closes: np.ndarray, w: int) -> bool:
|
||
return closes.size >= w and float(closes[-1]) > float(closes[-w:].mean())
|
||
|
||
@staticmethod
|
||
def _vol(closes: np.ndarray, w: int) -> float:
|
||
if closes.size < w + 1:
|
||
return float("nan")
|
||
rets = np.diff(closes[-w - 1:]) / np.maximum(closes[-w - 1:-1], 1e-12)
|
||
return float(rets.std(ddof=1) * np.sqrt(252))
|
||
|
||
@staticmethod
|
||
def _total_return(closes: np.ndarray, w: int) -> float:
|
||
if closes.size < w + 1 or closes[-w - 1] <= 0:
|
||
return float("nan")
|
||
return float(closes[-1] / closes[-w - 1] - 1.0)
|
||
|
||
def _desired_regime(self, closes: np.ndarray, current: str | None) -> str:
|
||
window_dd = closes[-self.dd_window:]
|
||
if closes[-1] / window_dd.max() - 1.0 <= -self.dd_stop:
|
||
return "risk_off"
|
||
if not self._above_ma(closes, self.ma_long):
|
||
return "risk_off"
|
||
v = self._vol(closes, self.vol_window)
|
||
if v != v:
|
||
v = 1.0
|
||
peak_ratio = closes[-1] / closes[-self.peak_window:].max()
|
||
if current == "risk_on":
|
||
if (self._above_ma(closes, self.ma_short)
|
||
and v < self.vol_exit
|
||
and peak_ratio >= 1.0 - self.peak_exit):
|
||
return "risk_on"
|
||
return "risk_off"
|
||
if (self._above_ma(closes, self.ma_short)
|
||
and v < self.vol_enter
|
||
and peak_ratio >= 1.0 - self.peak_enter):
|
||
return "risk_on"
|
||
return "risk_off"
|
||
|
||
def _pick_top(self, prices_t: np.ndarray, basket_idx: list[int],
|
||
closes_per_sym: dict[int, np.ndarray]) -> int | None:
|
||
best_i, best_r = None, -np.inf
|
||
for ix in basket_idx:
|
||
closes = closes_per_sym[ix]
|
||
r = self._total_return(closes, self.mom_lookback)
|
||
if r != r:
|
||
continue
|
||
if r > best_r:
|
||
best_r, best_i = r, ix
|
||
return best_i
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
cols = list({self.signal, *self.risk_on, *self.risk_off})
|
||
cols = [c for c in cols if c in data.columns]
|
||
sym_to_ix = {s: ix for ix, s in enumerate(cols)}
|
||
w = _empty_weights(data, cols)
|
||
|
||
if self.signal not in sym_to_ix:
|
||
return w.shift(1).fillna(0.0)
|
||
|
||
sig_arr = data[self.signal].to_numpy()
|
||
# Per-symbol close arrays (for momentum pick)
|
||
sym_arrays = {sym_to_ix[s]: data[s].to_numpy() for s in cols}
|
||
|
||
ron_idx = [sym_to_ix[s] for s in self.risk_on if s in sym_to_ix]
|
||
roff_idx = [sym_to_ix[s] for s in self.risk_off if s in sym_to_ix]
|
||
|
||
need = max(self.ma_long, self.vol_window + 1, self.dd_window,
|
||
self.peak_window, self.mom_lookback + 1) + 1
|
||
|
||
current_regime: str | None = None
|
||
bars_in_regime = 0
|
||
pending_regime: str | None = None
|
||
pending_count = 0
|
||
current_sym: int | None = None
|
||
bars_in_sym = 0
|
||
sym_entry_close: float | None = None
|
||
cooloff_remaining = 0
|
||
|
||
for t in range(len(data)):
|
||
if t < need:
|
||
continue
|
||
# Signal uses prices through t-1 (PIT lag)
|
||
sig_closes = sig_arr[: t]
|
||
if np.isnan(sig_closes[-1]):
|
||
continue
|
||
|
||
desired = self._desired_regime(sig_closes, current_regime)
|
||
emergency = (sig_closes[-1] / sig_closes[-self.dd_window:].max() - 1.0) <= -self.dd_stop
|
||
|
||
# Slice per-symbol closes through t-1
|
||
cps = {ix: arr[:t] for ix, arr in sym_arrays.items()}
|
||
cur_close = float(sig_arr[t - 1]) if not np.isnan(sig_arr[t - 1]) else None
|
||
# ^ used only for stop-loss reference computation below
|
||
|
||
def assign_one(sym_ix: int) -> None:
|
||
nonlocal current_sym, bars_in_sym, sym_entry_close
|
||
current_sym = sym_ix
|
||
bars_in_sym = 0
|
||
# Entry "fill" reference is today's close (but recorded at decision)
|
||
p = float(sym_arrays[sym_ix][t]) if t < sym_arrays[sym_ix].size else float("nan")
|
||
sym_entry_close = p if not np.isnan(p) else float(sym_arrays[sym_ix][t - 1])
|
||
|
||
# First placement
|
||
if current_regime is None:
|
||
basket = ron_idx if desired == "risk_on" else roff_idx
|
||
pick = self._pick_top(None, basket, cps)
|
||
if pick is None:
|
||
continue
|
||
current_regime = desired
|
||
bars_in_regime = 0
|
||
assign_one(pick)
|
||
w.iat[t, pick] = 1.0
|
||
continue
|
||
|
||
bars_in_regime += 1
|
||
bars_in_sym += 1
|
||
if cooloff_remaining > 0:
|
||
cooloff_remaining -= 1
|
||
|
||
in_on = current_regime == "risk_on"
|
||
sym_yclose = (float(sym_arrays[current_sym][t - 1])
|
||
if current_sym is not None and not np.isnan(sym_arrays[current_sym][t - 1])
|
||
else None)
|
||
|
||
# Stop-loss
|
||
if (in_on and sym_yclose is not None and sym_entry_close
|
||
and sym_yclose / sym_entry_close - 1.0 <= -self.stop_loss_pct):
|
||
pick = self._pick_top(None, roff_idx, cps)
|
||
if pick is not None:
|
||
current_regime = "risk_off"
|
||
bars_in_regime = 0
|
||
assign_one(pick)
|
||
pending_regime = None
|
||
pending_count = 0
|
||
cooloff_remaining = self.cooloff_days
|
||
w.iat[t, pick] = 1.0
|
||
continue
|
||
|
||
# Emergency dd stop
|
||
if emergency and current_regime != "risk_off":
|
||
pick = self._pick_top(None, roff_idx, cps)
|
||
if pick is not None:
|
||
current_regime = "risk_off"
|
||
bars_in_regime = 0
|
||
assign_one(pick)
|
||
pending_regime = None
|
||
pending_count = 0
|
||
w.iat[t, pick] = 1.0
|
||
continue
|
||
|
||
# Regime change with confirm + min-hold + cooloff
|
||
if desired != current_regime:
|
||
if current_regime == "risk_off" and cooloff_remaining > 0:
|
||
pending_regime = None
|
||
pending_count = 0
|
||
elif bars_in_regime < self.regime_min_hold:
|
||
pending_regime = None
|
||
pending_count = 0
|
||
else:
|
||
if desired != pending_regime:
|
||
pending_regime = desired
|
||
pending_count = 1
|
||
else:
|
||
pending_count += 1
|
||
if pending_count >= self.confirm_days:
|
||
basket = ron_idx if desired == "risk_on" else roff_idx
|
||
pick = self._pick_top(None, basket, cps)
|
||
if pick is None:
|
||
pick = current_sym
|
||
current_regime = desired
|
||
bars_in_regime = 0
|
||
assign_one(pick)
|
||
pending_regime = None
|
||
pending_count = 0
|
||
w.iat[t, pick] = 1.0
|
||
continue
|
||
# Hold prior allocation
|
||
if current_sym is not None:
|
||
w.iat[t, current_sym] = 1.0
|
||
continue
|
||
|
||
# Same regime — possibly rotate within basket
|
||
pending_regime = None
|
||
pending_count = 0
|
||
basket = ron_idx if current_regime == "risk_on" else roff_idx
|
||
top = self._pick_top(None, basket, cps)
|
||
if top is None or top == current_sym:
|
||
if current_sym is not None:
|
||
w.iat[t, current_sym] = 1.0
|
||
continue
|
||
if bars_in_sym < self.instrument_min_hold:
|
||
if current_sym is not None:
|
||
w.iat[t, current_sym] = 1.0
|
||
continue
|
||
assign_one(top)
|
||
w.iat[t, top] = 1.0
|
||
|
||
return w.shift(1).fillna(0.0)
|
||
|
||
|
||
class TrendRiderV4(Strategy):
|
||
"""Diversified TrendRider portfolio allocator.
|
||
|
||
V3 is a single-instrument state machine. V4 keeps the same broad regime
|
||
idea, but allocates across sleeves: core equity, capped leveraged equity,
|
||
defensive bonds/cash, and inflation hedges. It is still PIT-safe through a
|
||
terminal ``shift(1)``.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
signal: str = "SPY",
|
||
core_equity: tuple[str, ...] = ("SPY", "QQQ"),
|
||
leveraged_equity: tuple[str, ...] = ("SSO", "QLD", "UPRO", "TQQQ"),
|
||
defensive: tuple[str, ...] = ("SHY", "IEF", "TLT"),
|
||
inflation: tuple[str, ...] = ("GLD", "DBC"),
|
||
ma_long: int = 200,
|
||
ma_short: int = 50,
|
||
vol_window: int = 20,
|
||
vol_enter: float = 0.14,
|
||
vol_exit: float = 0.20,
|
||
dd_window: int = 40,
|
||
dd_stop: float = 0.05,
|
||
peak_window: int = 20,
|
||
peak_enter: float = 0.02,
|
||
peak_exit: float = 0.05,
|
||
regime_min_hold: int = 15,
|
||
confirm_days: int = 3,
|
||
mom_lookback: int = 63,
|
||
rebal_every: int = 21,
|
||
max_single_weight: float = 0.45,
|
||
max_leveraged_weight: float = 0.90,
|
||
risk_on_targets: tuple[float, float, float, float] = (0.10, 0.85, 0.00, 0.05),
|
||
risk_off_targets: tuple[float, float, float, float] = (0.30, 0.00, 0.50, 0.20),
|
||
) -> None:
|
||
self.signal = signal
|
||
self.core_equity = core_equity
|
||
self.leveraged_equity = leveraged_equity
|
||
self.defensive = defensive
|
||
self.inflation = inflation
|
||
self.ma_long = ma_long
|
||
self.ma_short = ma_short
|
||
self.vol_window = vol_window
|
||
self.vol_enter = vol_enter
|
||
self.vol_exit = vol_exit
|
||
self.dd_window = dd_window
|
||
self.dd_stop = dd_stop
|
||
self.peak_window = peak_window
|
||
self.peak_enter = peak_enter
|
||
self.peak_exit = peak_exit
|
||
self.regime_min_hold = regime_min_hold
|
||
self.confirm_days = confirm_days
|
||
self.mom_lookback = mom_lookback
|
||
self.rebal_every = rebal_every
|
||
self.max_single_weight = max_single_weight
|
||
self.max_leveraged_weight = max_leveraged_weight
|
||
self.risk_on_targets = risk_on_targets
|
||
self.risk_off_targets = risk_off_targets
|
||
|
||
def _desired_regime(self, closes: np.ndarray, current: str | None) -> str:
|
||
return TrendRiderV3(
|
||
signal=self.signal,
|
||
ma_long=self.ma_long,
|
||
ma_short=self.ma_short,
|
||
vol_window=self.vol_window,
|
||
vol_enter=self.vol_enter,
|
||
vol_exit=self.vol_exit,
|
||
dd_window=self.dd_window,
|
||
dd_stop=self.dd_stop,
|
||
peak_window=self.peak_window,
|
||
peak_enter=self.peak_enter,
|
||
peak_exit=self.peak_exit,
|
||
)._desired_regime(closes, current)
|
||
|
||
def _sleeve_weights(
|
||
self,
|
||
amount: float,
|
||
basket: tuple[str, ...],
|
||
cols: list[str],
|
||
mom_row: pd.Series,
|
||
vol_row: pd.Series,
|
||
top_n: int,
|
||
require_positive: bool = False,
|
||
) -> dict[str, float]:
|
||
if amount <= 0:
|
||
return {}
|
||
candidates = []
|
||
for sym in basket:
|
||
if sym not in cols or sym not in mom_row.index:
|
||
continue
|
||
mom = float(mom_row.get(sym, np.nan))
|
||
if not np.isfinite(mom):
|
||
continue
|
||
if require_positive and mom <= 0:
|
||
continue
|
||
vol = float(vol_row.get(sym, np.nan))
|
||
if not np.isfinite(vol) or vol <= 0:
|
||
vol = 0.20
|
||
candidates.append((sym, mom, max(vol, 0.05)))
|
||
if not candidates:
|
||
return {}
|
||
|
||
candidates.sort(key=lambda item: item[1], reverse=True)
|
||
selected = candidates[:max(1, top_n)]
|
||
inv_vol = np.array([1.0 / item[2] for item in selected], dtype=float)
|
||
inv_vol = inv_vol / inv_vol.sum()
|
||
return {sym: float(amount * weight) for (sym, _, _), weight in zip(selected, inv_vol)}
|
||
|
||
def _redistribute(self, row: dict[str, float], excess: float,
|
||
preferred: list[str]) -> float:
|
||
remaining = excess
|
||
for sym in preferred:
|
||
if remaining <= 1e-12:
|
||
break
|
||
if sym not in row:
|
||
continue
|
||
spare = max(self.max_single_weight - row.get(sym, 0.0), 0.0)
|
||
add = min(spare, remaining)
|
||
row[sym] = row.get(sym, 0.0) + add
|
||
remaining -= add
|
||
return remaining
|
||
|
||
def _apply_caps(self, row: dict[str, float], cols: list[str]) -> dict[str, float]:
|
||
row = {sym: float(weight) for sym, weight in row.items() if sym in cols and weight > 1e-12}
|
||
for sym in cols:
|
||
row.setdefault(sym, 0.0)
|
||
|
||
leveraged = [sym for sym in self.leveraged_equity if sym in row]
|
||
lev_total = sum(row[sym] for sym in leveraged)
|
||
excess = 0.0
|
||
if lev_total > self.max_leveraged_weight and lev_total > 0:
|
||
scale = self.max_leveraged_weight / lev_total
|
||
for sym in leveraged:
|
||
old = row[sym]
|
||
row[sym] = old * scale
|
||
excess += old - row[sym]
|
||
|
||
preferred = [*self.defensive, *self.inflation, *self.core_equity]
|
||
if excess > 1e-12:
|
||
excess = self._redistribute(row, excess, preferred)
|
||
|
||
for _ in range(len(row) + 1):
|
||
over = [sym for sym, weight in row.items() if weight > self.max_single_weight]
|
||
if not over:
|
||
break
|
||
for sym in over:
|
||
excess += row[sym] - self.max_single_weight
|
||
row[sym] = self.max_single_weight
|
||
excess = self._redistribute(row, excess, preferred)
|
||
if excess <= 1e-12:
|
||
break
|
||
|
||
if excess > 1e-12:
|
||
receivers = [sym for sym in row if row[sym] < self.max_single_weight - 1e-12]
|
||
spare = sum(self.max_single_weight - row[sym] for sym in receivers)
|
||
if spare > 0:
|
||
for sym in receivers:
|
||
add = excess * (self.max_single_weight - row[sym]) / spare
|
||
row[sym] += add
|
||
excess = 0.0
|
||
|
||
total = sum(row.values())
|
||
if total > 0:
|
||
row = {sym: weight / total for sym, weight in row.items()}
|
||
return {sym: weight for sym, weight in row.items() if weight > 1e-10}
|
||
|
||
def _allocate(self, regime: str, cols: list[str],
|
||
mom_row: pd.Series, vol_row: pd.Series) -> dict[str, float]:
|
||
if regime == "risk_on":
|
||
core, leveraged, defensive, inflation = self.risk_on_targets
|
||
sleeve_targets = {
|
||
"core": core,
|
||
"leveraged": leveraged,
|
||
"defensive": defensive,
|
||
"inflation": inflation,
|
||
}
|
||
else:
|
||
core, leveraged, defensive, inflation = self.risk_off_targets
|
||
sleeve_targets = {
|
||
"core": core,
|
||
"leveraged": leveraged,
|
||
"defensive": defensive,
|
||
"inflation": inflation,
|
||
}
|
||
|
||
row: dict[str, float] = {sym: 0.0 for sym in cols}
|
||
sleeves = [
|
||
(sleeve_targets["core"], self.core_equity, 2, False),
|
||
(sleeve_targets["leveraged"], self.leveraged_equity, 2, True),
|
||
(sleeve_targets["defensive"], self.defensive, 2, False),
|
||
(sleeve_targets["inflation"], self.inflation, 2, False),
|
||
]
|
||
unallocated = 0.0
|
||
for amount, basket, top_n, require_positive in sleeves:
|
||
alloc = self._sleeve_weights(amount, basket, cols, mom_row, vol_row, top_n, require_positive)
|
||
if not alloc:
|
||
unallocated += amount
|
||
continue
|
||
for sym, weight in alloc.items():
|
||
row[sym] += weight
|
||
|
||
if unallocated > 0:
|
||
fallback = next((sym for sym in self.defensive if sym in cols), None)
|
||
if fallback is not None:
|
||
row[fallback] += unallocated
|
||
|
||
return self._apply_caps(row, cols)
|
||
|
||
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
||
cols = list({
|
||
self.signal,
|
||
*self.core_equity,
|
||
*self.leveraged_equity,
|
||
*self.defensive,
|
||
*self.inflation,
|
||
})
|
||
cols = [c for c in cols if c in data.columns]
|
||
w = pd.DataFrame(np.nan, index=data.index, columns=cols)
|
||
|
||
if self.signal not in data.columns:
|
||
return _empty_weights(data, cols).shift(1).fillna(0.0)
|
||
|
||
signal_arr = data[self.signal].to_numpy()
|
||
returns = data[cols].pct_change(fill_method=None)
|
||
momentum = data[cols].pct_change(self.mom_lookback, fill_method=None)
|
||
vol = returns.rolling(self.vol_window).std() * np.sqrt(252)
|
||
need = max(self.ma_long, self.vol_window + 1, self.dd_window,
|
||
self.peak_window, self.mom_lookback + 1)
|
||
|
||
current_regime: str | None = None
|
||
bars_in_regime = 0
|
||
pending_regime: str | None = None
|
||
pending_count = 0
|
||
|
||
for i, dt in enumerate(data.index):
|
||
if i < need:
|
||
continue
|
||
closes = signal_arr[: i + 1]
|
||
if np.isnan(closes[-1]):
|
||
continue
|
||
|
||
desired = self._desired_regime(closes, current_regime)
|
||
regime_changed = False
|
||
if current_regime is None:
|
||
current_regime = desired
|
||
bars_in_regime = 0
|
||
regime_changed = True
|
||
else:
|
||
bars_in_regime += 1
|
||
if desired != current_regime:
|
||
if bars_in_regime >= self.regime_min_hold:
|
||
if desired != pending_regime:
|
||
pending_regime = desired
|
||
pending_count = 1
|
||
else:
|
||
pending_count += 1
|
||
if pending_count >= self.confirm_days:
|
||
current_regime = desired
|
||
bars_in_regime = 0
|
||
pending_regime = None
|
||
pending_count = 0
|
||
regime_changed = True
|
||
else:
|
||
pending_regime = None
|
||
pending_count = 0
|
||
else:
|
||
pending_regime = None
|
||
pending_count = 0
|
||
|
||
if not regime_changed and (i - need) % self.rebal_every != 0:
|
||
continue
|
||
|
||
row = self._allocate(
|
||
current_regime,
|
||
cols,
|
||
momentum.iloc[i],
|
||
vol.iloc[i],
|
||
)
|
||
w.loc[dt, cols] = 0.0
|
||
for sym, weight in row.items():
|
||
w.at[dt, sym] = weight
|
||
|
||
w = w.ffill().fillna(0.0)
|
||
return w.shift(1).fillna(0.0)
|