Files
quant/trader.py

952 lines
36 KiB
Python

"""
Daily trading simulation system.
Subcommands:
morning — Download today's open prices, run strategy, output trade orders
evening — Download close prices, record execution, update portfolio
auto — Single daily run (morning + evening in one step, for cron)
monitor — Long-running daemon that auto-runs daily after market close (for tmux)
status — Print current portfolio, P&L, and recent trades
simulate — Replay a date range day-by-day (for forward testing)
Usage:
uv run python trader.py morning --market us --strategy recovery_mom_top10
uv run python trader.py evening --market us --strategy recovery_mom_top10
uv run python trader.py auto --market us --strategy recovery_mom_top10
uv run python trader.py monitor --market us --strategy recovery_mom_top10
uv run python trader.py status --market us --strategy recovery_mom_top10
uv run python trader.py simulate --market us --strategy recovery_mom_top10 \\
--start 2026-01-01 --end 2026-04-04
Monitor mode (recommended for tmux):
tmux new -s quant
uv run python trader.py monitor --market us --strategy recovery_mom_top10
# Ctrl-B D to detach; tmux attach -t quant to reconnect
State is persisted in data/trader_{market}_{strategy}.json between runs.
"""
import argparse
import json
import os
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import yfinance as yf
import data_manager
from strategies.buy_and_hold import BuyAndHoldStrategy
from strategies.dual_momentum import DualMomentumStrategy
from strategies.inverse_vol import InverseVolatilityStrategy
from strategies.momentum import MomentumStrategy
from strategies.momentum_quality import MomentumQualityStrategy
from strategies.recovery_momentum import RecoveryMomentumStrategy
from strategies.trend_following import TrendFollowingStrategy
from universe import UNIVERSES
# ---------------------------------------------------------------------------
# Strategy registry
# ---------------------------------------------------------------------------
STRATEGY_REGISTRY = {
"recovery_mom_top10": lambda **kw: RecoveryMomentumStrategy(top_n=10),
"recovery_mom_top20": lambda **kw: RecoveryMomentumStrategy(top_n=20),
"recovery_mom_top50": lambda **kw: RecoveryMomentumStrategy(top_n=50),
"momentum": lambda **kw: MomentumStrategy(lookback=252, skip=21, top_n=kw.get("top_n", 20)),
"momentum_quality": lambda **kw: MomentumQualityStrategy(top_n=kw.get("top_n", 20)),
"dual_momentum": lambda **kw: DualMomentumStrategy(top_n=kw.get("top_n", 20)),
"inverse_vol": lambda **kw: InverseVolatilityStrategy(vol_window=20),
"trend_following": lambda **kw: TrendFollowingStrategy(top_n=kw.get("top_n", 20)),
"buy_and_hold": lambda **kw: BuyAndHoldStrategy(),
}
# ---------------------------------------------------------------------------
# Persistent state
# ---------------------------------------------------------------------------
def _state_path(market: str, strategy_name: str) -> str:
return os.path.join("data", f"trader_{market}_{strategy_name}.json")
def load_state(market: str, strategy_name: str) -> dict:
path = _state_path(market, strategy_name)
if os.path.exists(path):
with open(path) as f:
return json.load(f)
return {}
def save_state(state: dict, market: str, strategy_name: str) -> None:
os.makedirs("data", exist_ok=True)
path = _state_path(market, strategy_name)
with open(path, "w") as f:
json.dump(state, f, indent=2, default=str)
def init_state(market: str, strategy_name: str, capital: float) -> dict:
return {
"strategy": strategy_name,
"market": market,
"initial_capital": capital,
"cash": capital,
"holdings": {}, # ticker -> shares (float)
"pending_trades": None, # set by morning, consumed by evening
"trade_log": [], # list of trade dicts
"daily_equity": {}, # date_str -> portfolio value
"last_morning": None,
"last_evening": None,
}
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
def portfolio_value(holdings: dict, prices: dict, cash: float) -> float:
stock_value = sum(holdings.get(t, 0) * prices.get(t, 0) for t in holdings)
return cash + stock_value
def generate_target_weights(strategy, open_data: pd.DataFrame, target_date) -> dict:
"""
Run strategy on full open price history, extract target_date's weights.
Strategies have an internal shift(1) designed for close prices: weights[t]
uses data up to t-1. With open prices (observable same-day), we want the
signal that uses data up to today. To get this, we append a dummy future
row so the strategy computes one extra day of signals, then we read that
last row — which incorporates today's open prices.
"""
# Append a dummy row one day after the last date so the strategy's
# shift(1) produces a valid signal row that uses target_date's open
dummy_date = open_data.index[-1] + pd.Timedelta(days=1)
dummy_row = pd.DataFrame(
[open_data.iloc[-1].values],
index=[dummy_date],
columns=open_data.columns,
)
extended = pd.concat([open_data, dummy_row])
weights = strategy.generate_signals(extended)
# The dummy_date row in weights now uses data up to target_date (the real last day)
# Read the dummy row — that's today's actual signal
if dummy_date in weights.index:
row = weights.loc[dummy_date]
elif target_date in weights.index:
# Fallback: use target_date row (which uses data up to target_date - 1)
row = weights.loc[target_date]
else:
idx = weights.index.get_indexer([target_date], method="ffill")
if idx[0] >= 0:
row = weights.iloc[idx[0]]
else:
return {}
# Return only non-zero weights
return {t: float(w) for t, w in row.items() if w > 1e-6}
def compute_trades(holdings: dict, cash: float, target_weights: dict,
prices: dict, min_trade_value: float = 50.0) -> list[dict]:
"""
Compute trades needed to move from current holdings to target weights.
Returns list of {ticker, shares_delta, direction, est_value}.
"""
total = portfolio_value(holdings, prices, cash)
trades = []
all_tickers = set(list(target_weights.keys()) + list(holdings.keys()))
for ticker in sorted(all_tickers):
target_weight = target_weights.get(ticker, 0.0)
price = prices.get(ticker, 0.0)
if price <= 0:
continue
target_value = total * target_weight
target_shares = target_value / price
current_shares = holdings.get(ticker, 0.0)
delta = target_shares - current_shares
trade_value = abs(delta * price)
if trade_value < min_trade_value:
continue
trades.append({
"ticker": ticker,
"shares_delta": round(delta, 4),
"direction": "BUY" if delta > 0 else "SELL",
"est_value": round(trade_value, 2),
"price": round(price, 2),
"target_shares": round(target_shares, 4),
"current_shares": round(current_shares, 4),
})
return trades
def execute_trades(state: dict, trades: list[dict], prices: dict,
tx_cost: float = 0.001, fixed_fee: float = 0.0,
trade_date: str = "") -> None:
"""Execute trades: update holdings and cash in state, append to trade_log."""
holdings = state["holdings"]
cash = state["cash"]
for trade in trades:
ticker = trade["ticker"]
delta = trade["shares_delta"]
price = prices.get(ticker, trade["price"])
cost = abs(delta * price)
commission = cost * tx_cost + fixed_fee
if delta > 0:
# BUY
cash -= (cost + commission)
holdings[ticker] = holdings.get(ticker, 0.0) + delta
else:
# SELL
cash += (cost - commission)
holdings[ticker] = holdings.get(ticker, 0.0) + delta # delta is negative
# Remove zero holdings
if ticker in holdings and abs(holdings[ticker]) < 0.001:
del holdings[ticker]
state["trade_log"].append({
"date": trade_date,
"action": trade["direction"],
"ticker": ticker,
"shares": round(abs(delta), 4),
"price": round(price, 2),
"value": round(cost, 2),
"commission": round(commission, 2),
})
state["cash"] = round(cash, 2)
state["holdings"] = {k: round(v, 4) for k, v in holdings.items() if abs(v) >= 0.001}
def get_prices_for_date(tickers: list[str], date_idx, price_df: pd.DataFrame) -> dict:
"""Extract prices for specific tickers on a given date from a DataFrame."""
if date_idx in price_df.index:
row = price_df.loc[date_idx]
else:
idx = price_df.index.get_indexer([date_idx], method="ffill")
if idx[0] >= 0:
row = price_df.iloc[idx[0]]
else:
return {}
return {t: float(row[t]) for t in tickers if t in row.index and pd.notna(row[t])}
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_morning(args):
"""Morning: download open prices, generate today's trade orders."""
market = args.market
strategy_name = args.strategy
universe = UNIVERSES[market]
tickers = universe["fetch"]()
benchmark = universe["benchmark"]
all_tickers = sorted(set(tickers + [benchmark]))
# Load or init state
state = load_state(market, strategy_name)
if not state:
state = init_state(market, strategy_name, args.capital)
print(f"--- Initialized new portfolio: ${args.capital:,.0f} cash ---")
# Download data (close + open)
close_data, open_data = data_manager.update(market, all_tickers, with_open=True)
tickers = [t for t in tickers if t in close_data.columns]
today = open_data.index[-1]
today_str = str(today.date())
if state["last_morning"] == today_str:
print(f"Morning already run for {today_str}. Showing pending trades.\n")
print(f"\n{'='*60}")
print(f" MORNING SIGNAL — {today_str}")
print(f" Strategy: {strategy_name} | Market: {market.upper()}")
print(f"{'='*60}")
# Run strategy on open prices
strategy = STRATEGY_REGISTRY[strategy_name](top_n=max(5, len(tickers) // 10))
target_weights = generate_target_weights(strategy, open_data[tickers], today)
# Get today's open prices for trade computation
open_prices = get_prices_for_date(
list(set(list(target_weights.keys()) + list(state["holdings"].keys()))),
today, open_data
)
total = portfolio_value(state["holdings"], open_prices, state["cash"])
trades = compute_trades(state["holdings"], state["cash"], target_weights,
open_prices, min_trade_value=max(50, total * 0.001))
# Store pending
state["pending_trades"] = {
"date": today_str,
"target_weights": target_weights,
"trades": trades,
}
state["last_morning"] = today_str
save_state(state, market, strategy_name)
# Print summary
print(f"\n Portfolio value (at open): ${total:,.2f}")
print(f" Cash: ${state['cash']:,.2f}")
print(f" Target positions: {len(target_weights)}")
print(f" Trades needed: {len(trades)}")
if target_weights:
print(f"\n {'Target Weights':}")
print(f" {'Ticker':<8} {'Weight':>8} {'Open Price':>12}")
print(f" {'-'*30}")
for t, w in sorted(target_weights.items(), key=lambda x: -x[1]):
p = open_prices.get(t, 0)
print(f" {t:<8} {w:>7.1%} {p:>11.2f}")
if trades:
print(f"\n {'Trade Orders (execute at close)':}")
print(f" {'Action':<6} {'Ticker':<8} {'Shares':>10} {'~Value':>12} {'Open':>10}")
print(f" {'-'*48}")
total_buy = 0
total_sell = 0
for t in trades:
print(f" {t['direction']:<6} {t['ticker']:<8} {t['shares_delta']:>+10.2f} "
f"${t['est_value']:>10,.2f} {t['price']:>10.2f}")
if t["direction"] == "BUY":
total_buy += t["est_value"]
else:
total_sell += t["est_value"]
print(f"\n Total buys: ${total_buy:>12,.2f}")
print(f" Total sells: ${total_sell:>12,.2f}")
print(f" Net flow: ${total_sell - total_buy:>+12,.2f}")
else:
print("\n No trades needed today.")
print()
def cmd_evening(args):
"""Evening: record execution at close prices, update portfolio."""
market = args.market
strategy_name = args.strategy
universe = UNIVERSES[market]
tickers = universe["fetch"]()
benchmark = universe["benchmark"]
all_tickers = sorted(set(tickers + [benchmark]))
state = load_state(market, strategy_name)
if not state:
print("No state found. Run 'morning' first.")
return
if not state.get("pending_trades"):
print("No pending trades. Run 'morning' first.")
return
pending = state["pending_trades"]
trade_date = pending["date"]
if state["last_evening"] == trade_date:
print(f"Evening already recorded for {trade_date}.")
return
# Get close prices
close_data = data_manager.update(market, all_tickers)
tickers = [t for t in tickers if t in close_data.columns]
target_date = pd.Timestamp(trade_date)
all_held = list(set(
list(pending.get("target_weights", {}).keys()) +
list(state["holdings"].keys())
))
close_prices = get_prices_for_date(all_held, target_date, close_data)
# Recompute trades at close prices for accurate execution
trades = pending.get("trades", [])
print(f"\n{'='*60}")
print(f" EVENING EXECUTION — {trade_date}")
print(f" Strategy: {strategy_name} | Market: {market.upper()}")
print(f"{'='*60}")
pre_value = portfolio_value(state["holdings"], close_prices, state["cash"])
print(f"\n Pre-trade value: ${pre_value:,.2f}")
# Recompute trades with close prices for execution
target_weights = pending.get("target_weights", {})
exec_trades = compute_trades(
state["holdings"], state["cash"], target_weights,
close_prices, min_trade_value=max(50, pre_value * 0.001)
)
execute_trades(state, exec_trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee,
trade_date=trade_date)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][trade_date] = round(post_value, 2)
state["pending_trades"] = None
state["last_evening"] = trade_date
save_state(state, market, strategy_name)
n_executed = len(exec_trades)
total_commission = sum(t.get("commission", 0)
for t in state["trade_log"][-n_executed:]) if n_executed else 0
print(f" Trades executed: {n_executed}")
print(f" Total commission: ${total_commission:,.2f}")
print(f" Post-trade value: ${post_value:,.2f}")
print(f" Cash remaining: ${state['cash']:,.2f}")
print(f" Holdings: {len(state['holdings'])} positions")
if exec_trades:
print(f"\n {'Executed':}")
print(f" {'Action':<6} {'Ticker':<8} {'Shares':>10} {'Close':>10} {'Value':>12}")
print(f" {'-'*48}")
for t in exec_trades:
p = close_prices.get(t["ticker"], t["price"])
v = abs(t["shares_delta"]) * p
print(f" {t['direction']:<6} {t['ticker']:<8} {t['shares_delta']:>+10.2f} "
f"{p:>10.2f} ${v:>10,.2f}")
pnl = post_value - state["initial_capital"]
pnl_pct = pnl / state["initial_capital"] * 100
print(f"\n P&L since inception: ${pnl:>+,.2f} ({pnl_pct:>+.2f}%)")
print()
def cmd_status(args):
"""Print current portfolio status."""
market = args.market
strategy_name = args.strategy
state = load_state(market, strategy_name)
if not state:
print("No state found. Run 'morning' to initialize.")
return
# Get latest prices
universe = UNIVERSES[market]
tickers = universe["fetch"]()
benchmark = universe["benchmark"]
all_tickers = sorted(set(tickers + [benchmark]))
close_data = data_manager.update(market, all_tickers)
last_date = close_data.index[-1]
all_held = list(state["holdings"].keys())
prices = get_prices_for_date(all_held + [benchmark], last_date, close_data)
total = portfolio_value(state["holdings"], prices, state["cash"])
pnl = total - state["initial_capital"]
print(f"\n{'='*60}")
print(f" PORTFOLIO STATUS")
print(f" Strategy: {strategy_name} | Market: {market.upper()}")
print(f" Last data: {last_date.date()}")
print(f"{'='*60}")
print(f"\n Initial capital: ${state['initial_capital']:>12,.2f}")
print(f" Current value: ${total:>12,.2f}")
print(f" Cash: ${state['cash']:>12,.2f}")
print(f" P&L: ${pnl:>+12,.2f} ({pnl/state['initial_capital']*100:>+.2f}%)")
print(f" Last morning: {state.get('last_morning', 'N/A')}")
print(f" Last evening: {state.get('last_evening', 'N/A')}")
if state["holdings"]:
print(f"\n {'Holdings':}")
print(f" {'Ticker':<8} {'Shares':>10} {'Price':>10} {'Value':>12} {'Weight':>8}")
print(f" {'-'*52}")
stock_value = total - state["cash"]
for ticker, shares in sorted(state["holdings"].items(),
key=lambda x: -x[1] * prices.get(x[0], 0)):
p = prices.get(ticker, 0)
v = shares * p
w = v / total if total > 0 else 0
print(f" {ticker:<8} {shares:>10.2f} {p:>10.2f} ${v:>10,.2f} {w:>7.1%}")
print(f" {'Cash':<8} {'':>10} {'':>10} ${state['cash']:>10,.2f} "
f"{state['cash']/total*100 if total > 0 else 0:>6.1f}%")
else:
print("\n No holdings (100% cash)")
# Equity curve
if state.get("daily_equity"):
eq = state["daily_equity"]
dates = sorted(eq.keys())
print(f"\n {'Equity History (last 10 days)':}")
print(f" {'Date':<12} {'Value':>12} {'Daily':>8}")
print(f" {'-'*34}")
for i, d in enumerate(dates[-10:]):
v = eq[d]
prev = eq[dates[dates.index(d) - 1]] if dates.index(d) > 0 else state["initial_capital"]
daily_ret = (v / prev - 1) * 100
print(f" {d:<12} ${v:>10,.2f} {daily_ret:>+7.2f}%")
# Recent trades
log = state.get("trade_log", [])
if log:
recent = log[-15:]
print(f"\n {'Recent Trades (last 15)':}")
print(f" {'Date':<12} {'Action':<6} {'Ticker':<8} {'Shares':>8} {'Price':>10} {'Value':>10}")
print(f" {'-'*58}")
for t in recent:
print(f" {t['date']:<12} {t['action']:<6} {t['ticker']:<8} "
f"{t['shares']:>8.2f} {t['price']:>10.2f} ${t['value']:>8,.2f}")
print()
def cmd_simulate(args):
"""Simulate day-by-day over a date range."""
market = args.market
strategy_name = args.strategy
universe = UNIVERSES[market]
tickers = universe["fetch"]()
benchmark = universe["benchmark"]
all_tickers = sorted(set(tickers + [benchmark]))
# Load both open and close data
close_data, open_data = data_manager.update(market, all_tickers, with_open=True)
tickers = [t for t in tickers if t in close_data.columns]
# Date range
start = pd.Timestamp(args.start)
end = pd.Timestamp(args.end)
trading_days = close_data.index[(close_data.index >= start) & (close_data.index <= end)]
if len(trading_days) == 0:
print(f"No trading days found between {args.start} and {args.end}")
return
# Fresh state for simulation
state = init_state(market, f"sim_{strategy_name}", args.capital)
# Pre-compute strategy weights for the full period (much faster than per-day)
# Append dummy row so shift(1) inside strategy produces valid last-day signal
strategy = STRATEGY_REGISTRY[strategy_name](top_n=max(5, len(tickers) // 10))
open_tickers = open_data[tickers]
dummy_date = open_tickers.index[-1] + pd.Timedelta(days=1)
dummy_row = pd.DataFrame(
[open_tickers.iloc[-1].values], index=[dummy_date], columns=open_tickers.columns
)
extended = pd.concat([open_tickers, dummy_row])
raw_weights = strategy.generate_signals(extended)
# Strategy's shift(1): raw_weights[t] uses data up to t-1
# For date t, the signal using open[t] is in raw_weights[t+1]
# So shift(-1) to align: full_weights[t] = signal using open[t]
full_weights = raw_weights.shift(-1).fillna(0.0)
print(f"\n{'='*70}")
print(f" SIMULATION: {strategy_name} | {market.upper()}")
print(f" Period: {trading_days[0].date()} to {trading_days[-1].date()} "
f"({len(trading_days)} trading days)")
print(f" Capital: ${args.capital:,.0f}")
print(f"{'='*70}\n")
total_trades = 0
total_commission = 0.0
for i, day in enumerate(trading_days):
day_str = str(day.date())
# Morning: get target weights from pre-computed matrix
if day in full_weights.index:
row = full_weights.loc[day]
target_weights = {t: float(w) for t, w in row.items() if w > 1e-6}
else:
target_weights = {}
# Get open and close prices
all_held = list(set(list(target_weights.keys()) + list(state["holdings"].keys())))
open_prices = get_prices_for_date(all_held, day, open_data)
close_prices = get_prices_for_date(all_held + [benchmark], day, close_data)
if not close_prices:
continue
# Compute portfolio value before trades (at close, reflecting overnight changes)
pre_value = portfolio_value(state["holdings"], close_prices, state["cash"])
# Compute and execute trades at close prices
trades = compute_trades(
state["holdings"], state["cash"], target_weights,
close_prices, min_trade_value=max(50, pre_value * 0.001)
)
execute_trades(state, trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee,
trade_date=day_str)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][day_str] = round(post_value, 2)
day_trades = len(trades)
day_commission = sum(
t.get("commission", 0) for t in state["trade_log"][-day_trades:]
) if day_trades else 0
total_trades += day_trades
total_commission += day_commission
# Progress: print weekly or on trade days
if day_trades > 0 or i == 0 or i == len(trading_days) - 1:
prev_date = dates[-1] if (dates := sorted(state["daily_equity"].keys())[:-1]) else None
prev_val = state["daily_equity"].get(prev_date, args.capital) if prev_date else args.capital
daily_ret = (post_value / prev_val - 1) * 100 if prev_val > 0 else 0
print(f" {day_str} ${post_value:>12,.2f} {daily_ret:>+7.2f}% "
f"trades: {day_trades:>2} positions: {len(state['holdings']):>2}")
# Final summary
equity = state["daily_equity"]
dates = sorted(equity.keys())
final = equity[dates[-1]]
initial = args.capital
total_ret = (final / initial - 1) * 100
n_days = len(dates)
ann_ret = ((final / initial) ** (252 / n_days) - 1) * 100 if n_days > 0 else 0
# Max drawdown
values = [equity[d] for d in dates]
peak = values[0]
max_dd = 0
for v in values:
peak = max(peak, v)
dd = (v - peak) / peak
max_dd = min(max_dd, dd)
# Daily returns for Sharpe
daily_rets = []
for i in range(1, len(values)):
daily_rets.append(values[i] / values[i - 1] - 1)
daily_rets = np.array(daily_rets) if daily_rets else np.array([0])
sharpe = (daily_rets.mean() / daily_rets.std() * np.sqrt(252)) if daily_rets.std() > 0 else 0
print(f"\n{'='*70}")
print(f" SIMULATION RESULTS")
print(f"{'='*70}")
print(f" Initial capital: ${initial:>12,.2f}")
print(f" Final value: ${final:>12,.2f}")
print(f" Total return: {total_ret:>+11.2f}%")
print(f" Annualized return: {ann_ret:>+11.2f}%")
print(f" Sharpe ratio: {sharpe:>11.2f}")
print(f" Max drawdown: {max_dd * 100:>11.2f}%")
print(f" Total trades: {total_trades:>11}")
print(f" Total commission: ${total_commission:>11,.2f}")
print(f" Trading days: {n_days:>11}")
# Save state for review
state["strategy"] = strategy_name
state["market"] = market
save_state(state, market, f"sim_{strategy_name}")
print(f"\n State saved to: {_state_path(market, f'sim_{strategy_name}')}")
# Benchmark comparison
bench_start = close_data[benchmark].loc[trading_days[0]]
bench_end = close_data[benchmark].loc[trading_days[-1]]
bench_ret = (bench_end / bench_start - 1) * 100
print(f"\n Benchmark ({benchmark}): {bench_ret:>+.2f}%")
print(f" Alpha: {total_ret - bench_ret:>+.2f}%")
print()
def cmd_monitor(args):
"""
Long-running monitor — runs in a tmux session, automatically executes daily.
Sleeps until market close + buffer, runs auto logic, then sleeps until
the next trading day. Handles weekends and holidays gracefully.
Usage:
tmux new -s quant
uv run python trader.py monitor --market us --strategy recovery_mom_top10
# Ctrl-B D to detach
"""
import time as _time
import zoneinfo
market = args.market
# Market schedule configuration
MARKET_CONFIG = {
"us": {
"tz": zoneinfo.ZoneInfo("America/New_York"),
"close_hour": 16, # 4:00 PM ET
"close_min": 0,
"buffer_min": 35, # Wait 35 min after close for data settlement
"label": "US (NYSE/NASDAQ)",
},
"cn": {
"tz": zoneinfo.ZoneInfo("Asia/Shanghai"),
"close_hour": 15, # 3:00 PM CST
"close_min": 0,
"buffer_min": 35,
"label": "CN (SSE/SZSE)",
},
}
config = MARKET_CONFIG.get(market)
if not config:
print(f"[monitor] Unknown market '{market}'. Supported: {list(MARKET_CONFIG.keys())}")
return
tz = config["tz"]
run_hour = config["close_hour"]
run_min = config["close_min"] + config["buffer_min"]
# Handle minute overflow
if run_min >= 60:
run_hour += run_min // 60
run_min = run_min % 60
print(f"\n{'='*60}")
print(f" MONITOR MODE — {config['label']}")
print(f" Strategy: {args.strategy} | Market: {market.upper()}")
print(f" Daily run at: {run_hour:02d}:{run_min:02d} {tz}")
print(f" Capital: ${args.capital:,.0f}")
print(f"{'='*60}")
print(f"[monitor] Started at {datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] Press Ctrl+C to stop\n")
def _next_run_time(now):
"""Compute the next execution datetime (skip weekends)."""
# Today's run time
target = now.replace(hour=run_hour, minute=run_min, second=0, microsecond=0)
if now >= target:
# Already past today's run time, schedule for tomorrow
target += timedelta(days=1)
# Skip weekends: Saturday=5, Sunday=6
while target.weekday() >= 5:
target += timedelta(days=1)
return target
while True:
now = datetime.now(tz)
next_run = _next_run_time(now)
wait_seconds = (next_run - now).total_seconds()
print(f"[monitor] {now.strftime('%Y-%m-%d %H:%M:%S')}"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S %Z')} "
f"(in {wait_seconds/3600:.1f}h)")
# Sleep in chunks so Ctrl+C is responsive and we print heartbeats
while True:
now = datetime.now(tz)
remaining = (next_run - now).total_seconds()
if remaining <= 0:
break
# Sleep in 15-minute chunks, print heartbeat every hour
chunk = min(remaining, 900) # 15 min
_time.sleep(chunk)
now = datetime.now(tz)
remaining = (next_run - now).total_seconds()
if remaining > 60:
hours_left = remaining / 3600
if int(remaining) % 3600 < 900: # ~on the hour
print(f"[monitor] {now.strftime('%H:%M:%S')}"
f"waiting... ({hours_left:.1f}h until run)")
# Time to run!
now = datetime.now(tz)
print(f"\n[monitor] {'='*55}")
print(f"[monitor] Executing daily run at {now.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] {'='*55}")
try:
cmd_auto(args)
print(f"[monitor] Daily run completed successfully.")
except KeyboardInterrupt:
raise
except Exception as e:
print(f"[monitor] ERROR during daily run: {e}")
import traceback
traceback.print_exc()
print(f"[monitor] Will retry on next scheduled run.")
print()
def cmd_auto(args):
"""
Automated daily run — single invocation handles both morning + evening.
Designed for cron/systemd: run once after market close each trading day.
Downloads open + close prices for today, generates signals, executes, records.
Usage with cron (US market, run at 5pm ET weekdays):
0 17 * * 1-5 cd /path/to/quant && uv run python trader.py auto --market us
For CN market (run at 4pm CST):
0 16 * * 1-5 cd /path/to/quant && uv run python trader.py auto --market cn
"""
import time
market = args.market
strategy_name = args.strategy
universe = UNIVERSES[market]
tickers = universe["fetch"]()
benchmark = universe["benchmark"]
all_tickers = sorted(set(tickers + [benchmark]))
# Load or init state
state = load_state(market, strategy_name)
if not state:
state = init_state(market, strategy_name, args.capital)
print(f"[auto] Initialized new portfolio: ${args.capital:,.0f} cash")
# Download data (close + open)
close_data, open_data = data_manager.update(market, all_tickers, with_open=True)
tickers = [t for t in tickers if t in close_data.columns]
today = close_data.index[-1]
today_str = str(today.date())
# Skip if already processed today
if state.get("last_evening") == today_str:
print(f"[auto] {today_str} already processed. Nothing to do.")
return
print(f"\n[auto] {'='*55}")
print(f"[auto] {today_str} | {strategy_name} | {market.upper()}")
print(f"[auto] {'='*55}")
# --- MORNING PHASE: Generate target weights from open prices ---
strategy = STRATEGY_REGISTRY[strategy_name](top_n=max(5, len(tickers) // 10))
target_weights = generate_target_weights(strategy, open_data[tickers], today)
all_held = list(set(list(target_weights.keys()) + list(state["holdings"].keys())))
close_prices = get_prices_for_date(all_held + [benchmark], today, close_data)
if not close_prices:
print(f"[auto] No close prices for {today_str}. Market likely closed.")
return
pre_value = portfolio_value(state["holdings"], close_prices, state["cash"])
print(f"[auto] Pre-trade value: ${pre_value:,.2f}")
print(f"[auto] Target positions: {len(target_weights)}")
# --- EVENING PHASE: Execute trades at close prices ---
trades = compute_trades(
state["holdings"], state["cash"], target_weights,
close_prices, min_trade_value=max(50, pre_value * 0.001)
)
execute_trades(state, trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee,
trade_date=today_str)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][today_str] = round(post_value, 2)
state["last_morning"] = today_str
state["last_evening"] = today_str
state["pending_trades"] = None
save_state(state, market, strategy_name)
# Print summary
pnl = post_value - state["initial_capital"]
pnl_pct = pnl / state["initial_capital"] * 100
n_trades = len(trades)
commission = sum(t.get("commission", 0) for t in state["trade_log"][-n_trades:]) if n_trades else 0
print(f"[auto] Trades: {n_trades} | Commission: ${commission:,.2f}")
print(f"[auto] Post-trade value: ${post_value:,.2f} | Cash: ${state['cash']:,.2f}")
print(f"[auto] P&L: ${pnl:>+,.2f} ({pnl_pct:>+.2f}%)")
print(f"[auto] Holdings: {len(state['holdings'])} positions")
if trades:
for t in trades:
p = close_prices.get(t["ticker"], t["price"])
print(f"[auto] {t['direction']:<4} {t['ticker']:<8} {t['shares_delta']:>+10.2f} @ {p:.2f}")
# Benchmark
bench_eq = state["daily_equity"]
dates = sorted(bench_eq.keys())
if len(dates) >= 2:
prev_val = bench_eq[dates[-2]]
daily_ret = (post_value / prev_val - 1) * 100
print(f"[auto] Daily return: {daily_ret:>+.2f}%")
print(f"[auto] State saved: {_state_path(market, strategy_name)}")
print()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Daily trading simulation")
sub = parser.add_subparsers(dest="command", required=True)
# Common args
def add_common(p):
p.add_argument("--market", choices=UNIVERSES.keys(), default="us")
p.add_argument("--strategy", choices=STRATEGY_REGISTRY.keys(),
default="recovery_mom_top10")
p.add_argument("--capital", type=float, default=100_000)
p.add_argument("--tx-cost", type=float, default=0.001,
help="Proportional transaction cost (default: 0.001 = 10bps)")
p.add_argument("--fixed-fee", type=float, default=0.0,
help="Fixed dollar fee per trade")
# Morning
p_morning = sub.add_parser("morning", help="Generate trade orders from open prices")
add_common(p_morning)
# Evening
p_evening = sub.add_parser("evening", help="Record execution at close prices")
add_common(p_evening)
# Auto (single daily run — for cron/systemd)
p_auto = sub.add_parser("auto",
help="Automated daily run: signal + execute in one step (for cron)")
add_common(p_auto)
# Monitor (long-running daemon for tmux)
p_monitor = sub.add_parser("monitor",
help="Long-running daemon: auto-runs daily after market close (for tmux)")
add_common(p_monitor)
# Status (strategy is a free-form string to allow sim_ prefixed names)
p_status = sub.add_parser("status", help="Show current portfolio")
p_status.add_argument("--market", choices=UNIVERSES.keys(), default="us")
p_status.add_argument("--strategy", default="recovery_mom_top10",
help="Strategy name (or sim_<name> for simulation state)")
p_status.add_argument("--capital", type=float, default=100_000)
p_status.add_argument("--tx-cost", type=float, default=0.001)
p_status.add_argument("--fixed-fee", type=float, default=0.0)
# Simulate
p_sim = sub.add_parser("simulate", help="Simulate over a date range")
add_common(p_sim)
p_sim.add_argument("--start", required=True, help="Start date (YYYY-MM-DD)")
p_sim.add_argument("--end", required=True, help="End date (YYYY-MM-DD)")
args = parser.parse_args()
if args.command == "morning":
cmd_morning(args)
elif args.command == "evening":
cmd_evening(args)
elif args.command == "auto":
cmd_auto(args)
elif args.command == "monitor":
cmd_monitor(args)
elif args.command == "status":
cmd_status(args)
elif args.command == "simulate":
cmd_simulate(args)
if __name__ == "__main__":
main()