Backtesting engine supporting 11 strategies across US (S&P 500) and CN (CSI 300) markets with open-to-close execution, proportional + fixed per-trade fees. Daily trader (trader.py) with auto/morning/evening/simulate/status commands and cron-friendly `auto` mode for unattended daily runs on a server. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
52 lines
1.9 KiB
Python
52 lines
1.9 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
from strategies.base import Strategy
|
|
|
|
|
|
class TrendFollowingStrategy(Strategy):
|
|
"""
|
|
Per-stock trend following with momentum ranking.
|
|
|
|
Two filters applied:
|
|
1. Trend filter: only hold stocks trading above their own moving average
|
|
(individual uptrend, not market-level timing)
|
|
2. Momentum rank: among trending stocks, pick the top_n by 6-month return
|
|
|
|
This avoids the Multi-Factor problem of all-or-nothing market timing
|
|
while still providing downside protection at the individual stock level.
|
|
"""
|
|
|
|
def __init__(self, ma_window: int = 150, momentum_period: int = 126, top_n: int = 20):
|
|
self.ma_window = ma_window
|
|
self.momentum_period = momentum_period
|
|
self.top_n = top_n
|
|
|
|
def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
# Per-stock trend filter: price > MA
|
|
ma = data.rolling(self.ma_window).mean()
|
|
in_uptrend = data > ma
|
|
|
|
# Momentum score among trending stocks
|
|
momentum = data.pct_change(self.momentum_period)
|
|
# Mask out stocks not in uptrend
|
|
momentum_filtered = momentum.where(in_uptrend, np.nan)
|
|
|
|
# Rank and select top_n
|
|
n_valid = momentum_filtered.notna().sum(axis=1)
|
|
enough = n_valid >= 1
|
|
|
|
rank = momentum_filtered.rank(axis=1, ascending=False, na_option="bottom")
|
|
effective_n = n_valid.clip(upper=self.top_n)
|
|
top_mask = (rank <= effective_n.values.reshape(-1, 1)) & enough.values.reshape(-1, 1)
|
|
# Ensure we only pick stocks that are actually in uptrend
|
|
top_mask = top_mask & in_uptrend
|
|
|
|
raw = top_mask.astype(float)
|
|
row_sums = raw.sum(axis=1).replace(0, np.nan)
|
|
signals = raw.div(row_sums, axis=0).fillna(0.0)
|
|
|
|
warmup = max(self.ma_window, self.momentum_period)
|
|
signals.iloc[:warmup] = 0.0
|
|
|
|
return signals.shift(1).fillna(0.0)
|