Files
quant/strategies/permanent.py
Gahow Wang d086930ab3 feat: add new trading strategies
Add 12 strategy modules including adaptive blend, composite alpha,
cross-asset momentum, ensemble alpha, trend rider v5/v6, and more.
2026-05-14 12:54:05 +08:00

763 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Permanent Portfolio family — ported from usmart-quant TAA strategies.
Three strategies, all operating on a small ETF universe (SPY, TQQQ, UPRO,
GLD, DBC, TLT, SHY). Each `generate_signals(data)` returns a weights
DataFrame already 1-day lagged (PIT-safe), columns must be a subset of
``data.columns``.
* :class:`PermanentOverlay` — Browne's 25/25/25/25 with Faber MA200
overlay on the stock slot. Bullish → TQQQ; bearish → cash. Source:
``usmart-quant/strategies/taa_permanent_overlay.py``.
* :class:`TrendRiderV3` — risk-on/risk-off basket with momentum-ranked
pick, MA200 + vol/dd/peak gates, regime-min-hold + confirm + cooloff.
Source: ``usmart-quant/strategies/taa_trend_rider_v3.py``.
* :class:`PermanentV4` — improved Permanent. Stock slot picks the
momentum leader from (TQQQ, UPRO); bond slot rotates to SHY when TLT
is below its own MA200 (avoids 2022-style bond crashes); inflation
slot picks from (GLD, DBC). All four slots stay 25% — the same
diversification floor, but each slot self-rotates to its strongest
member.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from strategies.base import Strategy
# Universe of ETFs the strategies trade. The runner ensures these are
# present as columns in the price DataFrame.
ETF_UNIVERSE = ["SPY", "TQQQ", "UPRO", "GLD", "DBC", "TLT", "SHY"]
TREND_RIDER_V4_UNIVERSE = [
"SPY", "QQQ",
"SSO", "QLD", "UPRO", "TQQQ",
"SHY", "IEF", "TLT",
"GLD", "DBC",
]
# Global expansion: USD-listed leveraged ETFs giving HK/China exposure.
# YINN — 3x FTSE China 50 (mostly HK-listed: Tencent, Meituan, Alibaba HK ADR)
# CHAU — 3x CSI 300 A-shares (mainland blue-chips traded SH/SZ)
# Both trade in USD so they compose cleanly with TQQQ/UPRO. Full Yahoo
# history: YINN since 2010, CHAU since 2015-04.
GLOBAL_ETF_UNIVERSE = ETF_UNIVERSE + ["YINN", "CHAU"]
# HK-listed leveraged ETFs. Pure HK exposure (no proxy through ADRs):
# 7200.HK — HSI 2x (since 2017-03)
# 7500.HK — HSTECH 2x (since 2019-05)
# Note these trade in HKD; risk-off basket stays USD (GLD, DBC). Because
# HKD is pegged to USD (7.757.85), the FX drift over the test period is
# < 1% — acceptable as quasi-USD for this evaluation.
HK_ETF_UNIVERSE = ETF_UNIVERSE + ["7200.HK", "7500.HK"]
def _empty_weights(data: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
return pd.DataFrame(0.0, index=data.index, columns=cols)
class PermanentOverlay(Strategy):
"""Permanent Portfolio with Faber MA200 overlay on stock slot.
25% stock + 25% bonds + 25% gold + 25% cash. Stock slot holds TQQQ
when SPY > MA200 (PIT-lagged), else SHY (cash). Monthly rebalance.
"""
def __init__(
self,
ma_window: int = 200,
rebal_every: int = 21,
signal: str = "SPY",
stock_on: str = "TQQQ",
stock_off: str = "SHY",
bonds: str = "TLT",
gold: str = "GLD",
cash: str = "SHY",
) -> None:
self.ma_window = ma_window
self.rebal_every = rebal_every
self.signal = signal
self.stock_on = stock_on
self.stock_off = stock_off
self.bonds = bonds
self.gold = gold
self.cash = cash
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
cols = list(set([self.signal, self.stock_on, self.stock_off,
self.bonds, self.gold, self.cash]))
cols = [c for c in cols if c in data.columns]
w = pd.DataFrame(np.nan, index=data.index, columns=cols)
spy = data[self.signal]
ma = spy.rolling(self.ma_window).mean()
bull = (spy > ma)
for i, dt in enumerate(data.index):
if i < self.ma_window:
continue
if (i - self.ma_window) % self.rebal_every != 0:
continue
row = {c: 0.0 for c in cols}
if bull.iloc[i]:
row[self.stock_on] = row.get(self.stock_on, 0.0) + 0.25
row[self.bonds] = row.get(self.bonds, 0.0) + 0.25
row[self.gold] = row.get(self.gold, 0.0) + 0.25
row[self.cash] = row.get(self.cash, 0.0) + 0.25
else:
# Stock slot collapses into cash → effective 50% cash
row[self.bonds] = row.get(self.bonds, 0.0) + 0.25
row[self.gold] = row.get(self.gold, 0.0) + 0.25
row[self.cash] = row.get(self.cash, 0.0) + 0.50
for s, ww in row.items():
if s in w.columns:
w.at[dt, s] = ww
# Forward-fill across non-rebal days (NaNs); fill warmup with 0.
w = w.ffill().fillna(0.0)
return w.shift(1).fillna(0.0)
class PermanentV4(Strategy):
"""Improved Permanent — Faber filters on stock + bond + commodity basket.
Slots (25% each):
stock: SPY > MA200 → max-momentum of (TQQQ, UPRO); else SHY
bond: TLT > MA200(TLT) → TLT; else SHY
gold: max-momentum of (GLD, DBC) over 63 days
cash: SHY (fixed)
Three targeted upgrades over PermanentOverlay (which only filters
the stock slot):
1. Bond slot Faber filter solves 2022 (TLT 29% kills static
Permanent's bond sleeve). Vanilla PermanentOverlay was 20.7%
in 2022; adding the bond filter alone halves that.
2. Stock slot picks momentum leader of (TQQQ, UPRO) — UPRO
substitutes when S&P leads QQQ (e.g. 2022 tech-led pullback).
3. Inflation slot rotates between GLD and DBC. GLD captures
deflation/stagflation (2020); DBC captures commodity-driven
inflation (2022). Picking the leader avoids GLD's 2022 flat
year while still owning gold when it leads.
Rebalance every 21 days. PIT-safe via terminal .shift(1).
"""
def __init__(
self,
ma_window: int = 200,
mom_lookback: int = 63,
rebal_every: int = 21,
regime_signal: str = "SPY",
stock_basket: tuple[str, ...] = ("TQQQ", "UPRO"),
gold_basket: tuple[str, ...] = ("GLD", "DBC"),
bond: str = "TLT",
cash: str = "SHY",
) -> None:
self.ma_window = ma_window
self.mom_lookback = mom_lookback
self.rebal_every = rebal_every
self.regime_signal = regime_signal
self.stock_basket = stock_basket
self.gold_basket = gold_basket
self.bond = bond
self.cash = cash
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
cols = list({self.regime_signal, *self.stock_basket, *self.gold_basket,
self.bond, self.cash})
cols = [c for c in cols if c in data.columns]
w = pd.DataFrame(np.nan, index=data.index, columns=cols)
spy = data[self.regime_signal]
spy_bull = spy > spy.rolling(self.ma_window).mean()
tlt_bull = data[self.bond] > data[self.bond].rolling(self.ma_window).mean()
mom = data.pct_change(self.mom_lookback)
warmup = max(self.ma_window, self.mom_lookback)
for i, dt in enumerate(data.index):
if i < warmup:
continue
if (i - warmup) % self.rebal_every != 0:
continue
slots: dict[str, float] = {c: 0.0 for c in cols}
# Stock slot
if spy_bull.iloc[i]:
pick, best = None, -np.inf
for s in self.stock_basket:
r = mom.at[dt, s] if s in mom.columns else np.nan
if pd.notna(r) and r > best:
best, pick = r, s
if pick is None:
pick = self.cash
else:
pick = self.cash
slots[pick] += 0.25
# Bond slot
slots[self.bond if tlt_bull.iloc[i] else self.cash] += 0.25
# Gold/commodity slot — basket leader by momentum (no MA filter:
# commodities are valuable diversifier even when not trending up)
pick, best = None, -np.inf
for s in self.gold_basket:
r = mom.at[dt, s] if s in mom.columns else np.nan
if pd.notna(r) and r > best:
best, pick = r, s
if pick is None:
pick = self.cash
slots[pick] += 0.25
slots[self.cash] += 0.25
for s, ww in slots.items():
if s in w.columns:
w.at[dt, s] = ww
w = w.ffill().fillna(0.0)
return w.shift(1).fillna(0.0)
class TrendRiderV3(Strategy):
"""Risk-on / risk-off basket with momentum-ranked pick + regime gates.
Faithful port of ``taa_trend_rider_v3.py`` with vol/MA/dd/peak
hysteresis, min-hold, confirm-days, entry stop-loss, and cooloff.
Output is a single 100% allocation to whichever basket member is the
momentum leader at the current regime. PIT-safe (1-day signal lag).
"""
DEFAULT_RISK_ON = ("TQQQ", "UPRO")
DEFAULT_RISK_OFF = ("GLD", "DBC")
def __init__(
self,
signal: str = "SPY",
risk_on: tuple[str, ...] = DEFAULT_RISK_ON,
risk_off: tuple[str, ...] = DEFAULT_RISK_OFF,
ma_long: int = 200,
ma_short: int = 50,
vol_window: int = 20,
vol_enter: float = 0.14,
vol_exit: float = 0.20,
dd_window: int = 40,
dd_stop: float = 0.05,
peak_window: int = 20,
peak_enter: float = 0.02,
peak_exit: float = 0.05,
regime_min_hold: int = 15,
instrument_min_hold: int = 30,
confirm_days: int = 3,
stop_loss_pct: float = 0.15,
cooloff_days: int = 20,
mom_lookback: int = 63,
) -> None:
self.signal = signal
self.risk_on = risk_on
self.risk_off = risk_off
self.ma_long = ma_long
self.ma_short = ma_short
self.vol_window = vol_window
self.vol_enter = vol_enter
self.vol_exit = vol_exit
self.dd_window = dd_window
self.dd_stop = dd_stop
self.peak_window = peak_window
self.peak_enter = peak_enter
self.peak_exit = peak_exit
self.regime_min_hold = regime_min_hold
self.instrument_min_hold = instrument_min_hold
self.confirm_days = confirm_days
self.stop_loss_pct = stop_loss_pct
self.cooloff_days = cooloff_days
self.mom_lookback = mom_lookback
@staticmethod
def _above_ma(closes: np.ndarray, w: int) -> bool:
return closes.size >= w and float(closes[-1]) > float(closes[-w:].mean())
@staticmethod
def _vol(closes: np.ndarray, w: int) -> float:
if closes.size < w + 1:
return float("nan")
rets = np.diff(closes[-w - 1:]) / np.maximum(closes[-w - 1:-1], 1e-12)
return float(rets.std(ddof=1) * np.sqrt(252))
@staticmethod
def _total_return(closes: np.ndarray, w: int) -> float:
if closes.size < w + 1 or closes[-w - 1] <= 0:
return float("nan")
return float(closes[-1] / closes[-w - 1] - 1.0)
def _desired_regime(self, closes: np.ndarray, current: str | None) -> str:
window_dd = closes[-self.dd_window:]
if closes[-1] / window_dd.max() - 1.0 <= -self.dd_stop:
return "risk_off"
if not self._above_ma(closes, self.ma_long):
return "risk_off"
v = self._vol(closes, self.vol_window)
if v != v:
v = 1.0
peak_ratio = closes[-1] / closes[-self.peak_window:].max()
if current == "risk_on":
if (self._above_ma(closes, self.ma_short)
and v < self.vol_exit
and peak_ratio >= 1.0 - self.peak_exit):
return "risk_on"
return "risk_off"
if (self._above_ma(closes, self.ma_short)
and v < self.vol_enter
and peak_ratio >= 1.0 - self.peak_enter):
return "risk_on"
return "risk_off"
def _pick_top(self, prices_t: np.ndarray, basket_idx: list[int],
closes_per_sym: dict[int, np.ndarray]) -> int | None:
best_i, best_r = None, -np.inf
for ix in basket_idx:
closes = closes_per_sym[ix]
r = self._total_return(closes, self.mom_lookback)
if r != r:
continue
if r > best_r:
best_r, best_i = r, ix
return best_i
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
cols = list({self.signal, *self.risk_on, *self.risk_off})
cols = [c for c in cols if c in data.columns]
sym_to_ix = {s: ix for ix, s in enumerate(cols)}
w = _empty_weights(data, cols)
if self.signal not in sym_to_ix:
return w.shift(1).fillna(0.0)
sig_arr = data[self.signal].to_numpy()
# Per-symbol close arrays (for momentum pick)
sym_arrays = {sym_to_ix[s]: data[s].to_numpy() for s in cols}
ron_idx = [sym_to_ix[s] for s in self.risk_on if s in sym_to_ix]
roff_idx = [sym_to_ix[s] for s in self.risk_off if s in sym_to_ix]
need = max(self.ma_long, self.vol_window + 1, self.dd_window,
self.peak_window, self.mom_lookback + 1) + 1
current_regime: str | None = None
bars_in_regime = 0
pending_regime: str | None = None
pending_count = 0
current_sym: int | None = None
bars_in_sym = 0
sym_entry_close: float | None = None
cooloff_remaining = 0
for t in range(len(data)):
if t < need:
continue
# Signal uses prices through t-1 (PIT lag)
sig_closes = sig_arr[: t]
if np.isnan(sig_closes[-1]):
continue
desired = self._desired_regime(sig_closes, current_regime)
emergency = (sig_closes[-1] / sig_closes[-self.dd_window:].max() - 1.0) <= -self.dd_stop
# Slice per-symbol closes through t-1
cps = {ix: arr[:t] for ix, arr in sym_arrays.items()}
cur_close = float(sig_arr[t - 1]) if not np.isnan(sig_arr[t - 1]) else None
# ^ used only for stop-loss reference computation below
def assign_one(sym_ix: int) -> None:
nonlocal current_sym, bars_in_sym, sym_entry_close
current_sym = sym_ix
bars_in_sym = 0
# Entry "fill" reference is today's close (but recorded at decision)
p = float(sym_arrays[sym_ix][t]) if t < sym_arrays[sym_ix].size else float("nan")
sym_entry_close = p if not np.isnan(p) else float(sym_arrays[sym_ix][t - 1])
# First placement
if current_regime is None:
basket = ron_idx if desired == "risk_on" else roff_idx
pick = self._pick_top(None, basket, cps)
if pick is None:
continue
current_regime = desired
bars_in_regime = 0
assign_one(pick)
w.iat[t, pick] = 1.0
continue
bars_in_regime += 1
bars_in_sym += 1
if cooloff_remaining > 0:
cooloff_remaining -= 1
in_on = current_regime == "risk_on"
sym_yclose = (float(sym_arrays[current_sym][t - 1])
if current_sym is not None and not np.isnan(sym_arrays[current_sym][t - 1])
else None)
# Stop-loss
if (in_on and sym_yclose is not None and sym_entry_close
and sym_yclose / sym_entry_close - 1.0 <= -self.stop_loss_pct):
pick = self._pick_top(None, roff_idx, cps)
if pick is not None:
current_regime = "risk_off"
bars_in_regime = 0
assign_one(pick)
pending_regime = None
pending_count = 0
cooloff_remaining = self.cooloff_days
w.iat[t, pick] = 1.0
continue
# Emergency dd stop
if emergency and current_regime != "risk_off":
pick = self._pick_top(None, roff_idx, cps)
if pick is not None:
current_regime = "risk_off"
bars_in_regime = 0
assign_one(pick)
pending_regime = None
pending_count = 0
w.iat[t, pick] = 1.0
continue
# Regime change with confirm + min-hold + cooloff
if desired != current_regime:
if current_regime == "risk_off" and cooloff_remaining > 0:
pending_regime = None
pending_count = 0
elif bars_in_regime < self.regime_min_hold:
pending_regime = None
pending_count = 0
else:
if desired != pending_regime:
pending_regime = desired
pending_count = 1
else:
pending_count += 1
if pending_count >= self.confirm_days:
basket = ron_idx if desired == "risk_on" else roff_idx
pick = self._pick_top(None, basket, cps)
if pick is None:
pick = current_sym
current_regime = desired
bars_in_regime = 0
assign_one(pick)
pending_regime = None
pending_count = 0
w.iat[t, pick] = 1.0
continue
# Hold prior allocation
if current_sym is not None:
w.iat[t, current_sym] = 1.0
continue
# Same regime — possibly rotate within basket
pending_regime = None
pending_count = 0
basket = ron_idx if current_regime == "risk_on" else roff_idx
top = self._pick_top(None, basket, cps)
if top is None or top == current_sym:
if current_sym is not None:
w.iat[t, current_sym] = 1.0
continue
if bars_in_sym < self.instrument_min_hold:
if current_sym is not None:
w.iat[t, current_sym] = 1.0
continue
assign_one(top)
w.iat[t, top] = 1.0
return w.shift(1).fillna(0.0)
class TrendRiderV4(Strategy):
"""Diversified TrendRider portfolio allocator.
V3 is a single-instrument state machine. V4 keeps the same broad regime
idea, but allocates across sleeves: core equity, capped leveraged equity,
defensive bonds/cash, and inflation hedges. It is still PIT-safe through a
terminal ``shift(1)``.
"""
def __init__(
self,
signal: str = "SPY",
core_equity: tuple[str, ...] = ("SPY", "QQQ"),
leveraged_equity: tuple[str, ...] = ("SSO", "QLD", "UPRO", "TQQQ"),
defensive: tuple[str, ...] = ("SHY", "IEF", "TLT"),
inflation: tuple[str, ...] = ("GLD", "DBC"),
ma_long: int = 200,
ma_short: int = 50,
vol_window: int = 20,
vol_enter: float = 0.14,
vol_exit: float = 0.20,
dd_window: int = 40,
dd_stop: float = 0.05,
peak_window: int = 20,
peak_enter: float = 0.02,
peak_exit: float = 0.05,
regime_min_hold: int = 15,
confirm_days: int = 3,
mom_lookback: int = 63,
rebal_every: int = 21,
max_single_weight: float = 0.45,
max_leveraged_weight: float = 0.90,
risk_on_targets: tuple[float, float, float, float] = (0.10, 0.85, 0.00, 0.05),
risk_off_targets: tuple[float, float, float, float] = (0.30, 0.00, 0.50, 0.20),
) -> None:
self.signal = signal
self.core_equity = core_equity
self.leveraged_equity = leveraged_equity
self.defensive = defensive
self.inflation = inflation
self.ma_long = ma_long
self.ma_short = ma_short
self.vol_window = vol_window
self.vol_enter = vol_enter
self.vol_exit = vol_exit
self.dd_window = dd_window
self.dd_stop = dd_stop
self.peak_window = peak_window
self.peak_enter = peak_enter
self.peak_exit = peak_exit
self.regime_min_hold = regime_min_hold
self.confirm_days = confirm_days
self.mom_lookback = mom_lookback
self.rebal_every = rebal_every
self.max_single_weight = max_single_weight
self.max_leveraged_weight = max_leveraged_weight
self.risk_on_targets = risk_on_targets
self.risk_off_targets = risk_off_targets
def _desired_regime(self, closes: np.ndarray, current: str | None) -> str:
return TrendRiderV3(
signal=self.signal,
ma_long=self.ma_long,
ma_short=self.ma_short,
vol_window=self.vol_window,
vol_enter=self.vol_enter,
vol_exit=self.vol_exit,
dd_window=self.dd_window,
dd_stop=self.dd_stop,
peak_window=self.peak_window,
peak_enter=self.peak_enter,
peak_exit=self.peak_exit,
)._desired_regime(closes, current)
def _sleeve_weights(
self,
amount: float,
basket: tuple[str, ...],
cols: list[str],
mom_row: pd.Series,
vol_row: pd.Series,
top_n: int,
require_positive: bool = False,
) -> dict[str, float]:
if amount <= 0:
return {}
candidates = []
for sym in basket:
if sym not in cols or sym not in mom_row.index:
continue
mom = float(mom_row.get(sym, np.nan))
if not np.isfinite(mom):
continue
if require_positive and mom <= 0:
continue
vol = float(vol_row.get(sym, np.nan))
if not np.isfinite(vol) or vol <= 0:
vol = 0.20
candidates.append((sym, mom, max(vol, 0.05)))
if not candidates:
return {}
candidates.sort(key=lambda item: item[1], reverse=True)
selected = candidates[:max(1, top_n)]
inv_vol = np.array([1.0 / item[2] for item in selected], dtype=float)
inv_vol = inv_vol / inv_vol.sum()
return {sym: float(amount * weight) for (sym, _, _), weight in zip(selected, inv_vol)}
def _redistribute(self, row: dict[str, float], excess: float,
preferred: list[str]) -> float:
remaining = excess
for sym in preferred:
if remaining <= 1e-12:
break
if sym not in row:
continue
spare = max(self.max_single_weight - row.get(sym, 0.0), 0.0)
add = min(spare, remaining)
row[sym] = row.get(sym, 0.0) + add
remaining -= add
return remaining
def _apply_caps(self, row: dict[str, float], cols: list[str]) -> dict[str, float]:
row = {sym: float(weight) for sym, weight in row.items() if sym in cols and weight > 1e-12}
for sym in cols:
row.setdefault(sym, 0.0)
leveraged = [sym for sym in self.leveraged_equity if sym in row]
lev_total = sum(row[sym] for sym in leveraged)
excess = 0.0
if lev_total > self.max_leveraged_weight and lev_total > 0:
scale = self.max_leveraged_weight / lev_total
for sym in leveraged:
old = row[sym]
row[sym] = old * scale
excess += old - row[sym]
preferred = [*self.defensive, *self.inflation, *self.core_equity]
if excess > 1e-12:
excess = self._redistribute(row, excess, preferred)
for _ in range(len(row) + 1):
over = [sym for sym, weight in row.items() if weight > self.max_single_weight]
if not over:
break
for sym in over:
excess += row[sym] - self.max_single_weight
row[sym] = self.max_single_weight
excess = self._redistribute(row, excess, preferred)
if excess <= 1e-12:
break
if excess > 1e-12:
receivers = [sym for sym in row if row[sym] < self.max_single_weight - 1e-12]
spare = sum(self.max_single_weight - row[sym] for sym in receivers)
if spare > 0:
for sym in receivers:
add = excess * (self.max_single_weight - row[sym]) / spare
row[sym] += add
excess = 0.0
total = sum(row.values())
if total > 0:
row = {sym: weight / total for sym, weight in row.items()}
return {sym: weight for sym, weight in row.items() if weight > 1e-10}
def _allocate(self, regime: str, cols: list[str],
mom_row: pd.Series, vol_row: pd.Series) -> dict[str, float]:
if regime == "risk_on":
core, leveraged, defensive, inflation = self.risk_on_targets
sleeve_targets = {
"core": core,
"leveraged": leveraged,
"defensive": defensive,
"inflation": inflation,
}
else:
core, leveraged, defensive, inflation = self.risk_off_targets
sleeve_targets = {
"core": core,
"leveraged": leveraged,
"defensive": defensive,
"inflation": inflation,
}
row: dict[str, float] = {sym: 0.0 for sym in cols}
sleeves = [
(sleeve_targets["core"], self.core_equity, 2, False),
(sleeve_targets["leveraged"], self.leveraged_equity, 2, True),
(sleeve_targets["defensive"], self.defensive, 2, False),
(sleeve_targets["inflation"], self.inflation, 2, False),
]
unallocated = 0.0
for amount, basket, top_n, require_positive in sleeves:
alloc = self._sleeve_weights(amount, basket, cols, mom_row, vol_row, top_n, require_positive)
if not alloc:
unallocated += amount
continue
for sym, weight in alloc.items():
row[sym] += weight
if unallocated > 0:
fallback = next((sym for sym in self.defensive if sym in cols), None)
if fallback is not None:
row[fallback] += unallocated
return self._apply_caps(row, cols)
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
cols = list({
self.signal,
*self.core_equity,
*self.leveraged_equity,
*self.defensive,
*self.inflation,
})
cols = [c for c in cols if c in data.columns]
w = pd.DataFrame(np.nan, index=data.index, columns=cols)
if self.signal not in data.columns:
return _empty_weights(data, cols).shift(1).fillna(0.0)
signal_arr = data[self.signal].to_numpy()
returns = data[cols].pct_change(fill_method=None)
momentum = data[cols].pct_change(self.mom_lookback, fill_method=None)
vol = returns.rolling(self.vol_window).std() * np.sqrt(252)
need = max(self.ma_long, self.vol_window + 1, self.dd_window,
self.peak_window, self.mom_lookback + 1)
current_regime: str | None = None
bars_in_regime = 0
pending_regime: str | None = None
pending_count = 0
for i, dt in enumerate(data.index):
if i < need:
continue
closes = signal_arr[: i + 1]
if np.isnan(closes[-1]):
continue
desired = self._desired_regime(closes, current_regime)
regime_changed = False
if current_regime is None:
current_regime = desired
bars_in_regime = 0
regime_changed = True
else:
bars_in_regime += 1
if desired != current_regime:
if bars_in_regime >= self.regime_min_hold:
if desired != pending_regime:
pending_regime = desired
pending_count = 1
else:
pending_count += 1
if pending_count >= self.confirm_days:
current_regime = desired
bars_in_regime = 0
pending_regime = None
pending_count = 0
regime_changed = True
else:
pending_regime = None
pending_count = 0
else:
pending_regime = None
pending_count = 0
if not regime_changed and (i - need) % self.rebal_every != 0:
continue
row = self._allocate(
current_regime,
cols,
momentum.iloc[i],
vol.iloc[i],
)
w.loc[dt, cols] = 0.0
for sym, weight in row.items():
w.at[dt, sym] = weight
w = w.ffill().fillna(0.0)
return w.shift(1).fillna(0.0)