""" Daily trading simulation system. Subcommands: morning — Download today's open prices, run strategy, output trade orders evening — Download close prices, record execution, update portfolio auto — Single daily run (morning + evening in one step, for cron) monitor — Long-running daemon for US+CN, all strategies (one tmux) status — Print current portfolio, P&L, and recent trades simulate — Replay a date range day-by-day (for forward testing) log — View daily log: each day's holdings, cash, and operations compare — Compare all strategies side-by-side (auto-discovers state files) Usage: uv run python trader.py morning --market us --strategy recovery_mom_top10 uv run python trader.py evening --market us --strategy recovery_mom_top10 uv run python trader.py auto --market us --strategy recovery_mom_top10 uv run python trader.py monitor --market us --strategy recovery_mom_top10 uv run python trader.py status --market us --strategy recovery_mom_top10 uv run python trader.py simulate --market us --strategy recovery_mom_top10 \\ --start 2026-01-01 --end 2026-04-04 Monitor mode (recommended for tmux): tmux new -s quant uv run python trader.py monitor --market us --strategy recovery_mom_top10 # Ctrl-B D to detach; tmux attach -t quant to reconnect State is persisted in data/trader_{market}_{strategy}.json between runs. """ import argparse import json import os from datetime import datetime, timedelta import numpy as np import pandas as pd import yfinance as yf import data_manager from strategies.buy_and_hold import BuyAndHoldStrategy from strategies.dual_momentum import DualMomentumStrategy from strategies.inverse_vol import InverseVolatilityStrategy from strategies.momentum import MomentumStrategy from strategies.momentum_quality import MomentumQualityStrategy from strategies.recovery_momentum import RecoveryMomentumStrategy from strategies.trend_following import TrendFollowingStrategy from universe import UNIVERSES # --------------------------------------------------------------------------- # Strategy registry # --------------------------------------------------------------------------- STRATEGY_REGISTRY = { "recovery_mom_top10": lambda **kw: RecoveryMomentumStrategy(top_n=10), "recovery_mom_top20": lambda **kw: RecoveryMomentumStrategy(top_n=20), "recovery_mom_top50": lambda **kw: RecoveryMomentumStrategy(top_n=50), "momentum": lambda **kw: MomentumStrategy(lookback=252, skip=21, top_n=kw.get("top_n", 20)), "momentum_quality": lambda **kw: MomentumQualityStrategy(top_n=kw.get("top_n", 20)), "dual_momentum": lambda **kw: DualMomentumStrategy(top_n=kw.get("top_n", 20)), "inverse_vol": lambda **kw: InverseVolatilityStrategy(vol_window=20), "trend_following": lambda **kw: TrendFollowingStrategy(top_n=kw.get("top_n", 20)), "buy_and_hold": lambda **kw: BuyAndHoldStrategy(), } # --------------------------------------------------------------------------- # Persistent state # --------------------------------------------------------------------------- def _state_path(market: str, strategy_name: str) -> str: return os.path.join("data", f"trader_{market}_{strategy_name}.json") def load_state(market: str, strategy_name: str) -> dict: path = _state_path(market, strategy_name) if os.path.exists(path): with open(path) as f: return json.load(f) return {} def save_state(state: dict, market: str, strategy_name: str) -> None: os.makedirs("data", exist_ok=True) path = _state_path(market, strategy_name) with open(path, "w") as f: json.dump(state, f, indent=2, default=str) def init_state(market: str, strategy_name: str, capital: float) -> dict: return { "strategy": strategy_name, "market": market, "initial_capital": capital, "cash": capital, "holdings": {}, # ticker -> shares (float) "pending_trades": None, # set by morning, consumed by evening "trade_log": [], # list of trade dicts "daily_equity": {}, # date_str -> portfolio value "daily_log": [], # list of per-day snapshots (holdings, cash, trades) "last_morning": None, "last_evening": None, } # --------------------------------------------------------------------------- # Core logic # --------------------------------------------------------------------------- def portfolio_value(holdings: dict, prices: dict, cash: float) -> float: stock_value = sum(holdings.get(t, 0) * prices.get(t, 0) for t in holdings) return cash + stock_value def generate_target_weights(strategy, open_data: pd.DataFrame, target_date) -> dict: """ Run strategy on full open price history, extract target_date's weights. Strategies have an internal shift(1) designed for close prices: weights[t] uses data up to t-1. With open prices (observable same-day), we want the signal that uses data up to today. To get this, we append a dummy future row so the strategy computes one extra day of signals, then we read that last row — which incorporates today's open prices. """ # Append a dummy row one day after the last date so the strategy's # shift(1) produces a valid signal row that uses target_date's open dummy_date = open_data.index[-1] + pd.Timedelta(days=1) dummy_row = pd.DataFrame( [open_data.iloc[-1].values], index=[dummy_date], columns=open_data.columns, ) extended = pd.concat([open_data, dummy_row]) weights = strategy.generate_signals(extended) # The dummy_date row in weights now uses data up to target_date (the real last day) # Read the dummy row — that's today's actual signal if dummy_date in weights.index: row = weights.loc[dummy_date] elif target_date in weights.index: # Fallback: use target_date row (which uses data up to target_date - 1) row = weights.loc[target_date] else: idx = weights.index.get_indexer([target_date], method="ffill") if idx[0] >= 0: row = weights.iloc[idx[0]] else: return {} # Return only non-zero weights return {t: float(w) for t, w in row.items() if w > 1e-6} def compute_trades(holdings: dict, cash: float, target_weights: dict, prices: dict, min_trade_value: float = 50.0, integer_shares: bool = False) -> list[dict]: """ Compute trades needed to move from current holdings to target weights. If integer_shares=True, share deltas are rounded to whole numbers and buys are capped so they don't exceed available cash (accounting for commissions). Sells are processed first to free up cash. Returns list of {ticker, shares_delta, direction, est_value}. """ total = portfolio_value(holdings, prices, cash) raw = [] all_tickers = set(list(target_weights.keys()) + list(holdings.keys())) for ticker in sorted(all_tickers): target_weight = target_weights.get(ticker, 0.0) price = prices.get(ticker, 0.0) if price <= 0: continue target_value = total * target_weight target_shares = target_value / price current_shares = holdings.get(ticker, 0.0) delta = target_shares - current_shares if integer_shares: # Round toward zero for buys (floor), away from zero for sells (floor of abs) if delta > 0: delta = int(delta) # floor: don't overshoot cash else: delta = -int(abs(delta)) # floor of magnitude: don't over-sell trade_value = abs(delta * price) if trade_value < min_trade_value: continue if integer_shares and abs(delta) < 1: continue raw.append({ "ticker": ticker, "shares_delta": int(delta) if integer_shares else round(delta, 4), "direction": "BUY" if delta > 0 else "SELL", "est_value": round(trade_value, 2), "price": round(price, 2), "target_shares": int(round(target_shares)) if integer_shares else round(target_shares, 4), "current_shares": int(current_shares) if integer_shares else round(current_shares, 4), }) return raw def execute_trades(state: dict, trades: list[dict], prices: dict, tx_cost: float = 0.001, fixed_fee: float = 0.0, trade_date: str = "", integer_shares: bool = False) -> None: """Execute trades: update holdings and cash in state, append to trade_log. When integer_shares=True, sells are executed first to free up cash, then buys are executed only if sufficient cash is available. """ holdings = state["holdings"] cash = state["cash"] # When using integer shares, execute sells first to free cash for buys sells = [t for t in trades if t["shares_delta"] < 0] buys = [t for t in trades if t["shares_delta"] > 0] ordered = sells + buys if integer_shares else trades for trade in ordered: ticker = trade["ticker"] delta = trade["shares_delta"] price = prices.get(ticker, trade["price"]) cost = abs(delta * price) commission = cost * tx_cost + fixed_fee if delta > 0: # BUY — skip if insufficient cash in integer mode if integer_shares and (cost + commission) > cash: # Try buying fewer shares that we can afford affordable = int((cash - fixed_fee) / (price * (1 + tx_cost))) if affordable < 1: continue delta = affordable cost = abs(delta * price) commission = cost * tx_cost + fixed_fee cash -= (cost + commission) holdings[ticker] = holdings.get(ticker, 0.0) + delta else: # SELL cash += (cost - commission) holdings[ticker] = holdings.get(ticker, 0.0) + delta # delta is negative # Remove zero holdings if ticker in holdings and abs(holdings[ticker]) < (0.5 if integer_shares else 0.001): del holdings[ticker] state["trade_log"].append({ "date": trade_date, "action": "BUY" if delta > 0 else "SELL", "ticker": ticker, "shares": abs(delta) if integer_shares else round(abs(delta), 4), "price": round(price, 2), "value": round(cost, 2), "commission": round(commission, 2), }) state["cash"] = round(cash, 2) if integer_shares: state["holdings"] = {k: int(round(v)) for k, v in holdings.items() if abs(v) >= 0.5} else: state["holdings"] = {k: round(v, 4) for k, v in holdings.items() if abs(v) >= 0.001} def record_daily_snapshot(state: dict, date_str: str, prices: dict, trades: list[dict], prev_equity: float) -> None: """Append a daily snapshot to state['daily_log']. Each entry captures the full picture: date, holdings with prices/values, cash, total equity, daily return, and today's operations. """ holdings = state["holdings"] cash = state["cash"] total = portfolio_value(holdings, prices, cash) daily_ret = (total / prev_equity - 1) * 100 if prev_equity > 0 else 0.0 # Holdings detail: ticker -> {shares, price, value} positions = {} for ticker, shares in sorted(holdings.items()): p = prices.get(ticker, 0.0) positions[ticker] = { "shares": shares, "price": round(p, 2), "value": round(shares * p, 2), } # Operations: list of {action, ticker, shares, price, value, commission} operations = [] for t in trades: p = prices.get(t["ticker"], t["price"]) operations.append({ "action": t["direction"], "ticker": t["ticker"], "shares": abs(t["shares_delta"]), "price": round(p, 2), "value": round(abs(t["shares_delta"]) * p, 2), }) entry = { "date": date_str, "equity": round(total, 2), "cash": round(cash, 2), "daily_return_pct": round(daily_ret, 2), "n_positions": len(holdings), "n_trades": len(trades), "holdings": positions, "operations": operations, } if "daily_log" not in state: state["daily_log"] = [] state["daily_log"].append(entry) def get_prices_for_date(tickers: list[str], date_idx, price_df: pd.DataFrame) -> dict: """Extract prices for specific tickers on a given date from a DataFrame.""" if date_idx in price_df.index: row = price_df.loc[date_idx] else: idx = price_df.index.get_indexer([date_idx], method="ffill") if idx[0] >= 0: row = price_df.iloc[idx[0]] else: return {} return {t: float(row[t]) for t in tickers if t in row.index and pd.notna(row[t])} # --------------------------------------------------------------------------- # Commands # --------------------------------------------------------------------------- def cmd_morning(args): """Morning: download open prices, generate today's trade orders.""" market = args.market strategy_name = args.strategy universe = UNIVERSES[market] tickers = universe["fetch"]() benchmark = universe["benchmark"] all_tickers = sorted(set(tickers + [benchmark])) # Load or init state state = load_state(market, strategy_name) if not state: state = init_state(market, strategy_name, args.capital) print(f"--- Initialized new portfolio: ${args.capital:,.0f} cash ---") # Download data (close + open) close_data, open_data = data_manager.update(market, all_tickers, with_open=True) tickers = [t for t in tickers if t in close_data.columns] today = open_data.index[-1] today_str = str(today.date()) if state["last_morning"] == today_str: print(f"Morning already run for {today_str}. Showing pending trades.\n") print(f"\n{'='*60}") print(f" MORNING SIGNAL — {today_str}") print(f" Strategy: {strategy_name} | Market: {market.upper()}") print(f"{'='*60}") # Run strategy on open prices strategy = STRATEGY_REGISTRY[strategy_name](top_n=max(5, len(tickers) // 10)) target_weights = generate_target_weights(strategy, open_data[tickers], today) # Get today's open prices for trade computation open_prices = get_prices_for_date( list(set(list(target_weights.keys()) + list(state["holdings"].keys()))), today, open_data ) total = portfolio_value(state["holdings"], open_prices, state["cash"]) trades = compute_trades(state["holdings"], state["cash"], target_weights, open_prices, min_trade_value=max(50, total * 0.001), integer_shares=args.integer_shares) # Store pending state["pending_trades"] = { "date": today_str, "target_weights": target_weights, "trades": trades, } state["last_morning"] = today_str save_state(state, market, strategy_name) # Print summary print(f"\n Portfolio value (at open): ${total:,.2f}") print(f" Cash: ${state['cash']:,.2f}") print(f" Target positions: {len(target_weights)}") print(f" Trades needed: {len(trades)}") if target_weights: print(f"\n {'Target Weights':}") print(f" {'Ticker':<8} {'Weight':>8} {'Open Price':>12}") print(f" {'-'*30}") for t, w in sorted(target_weights.items(), key=lambda x: -x[1]): p = open_prices.get(t, 0) print(f" {t:<8} {w:>7.1%} {p:>11.2f}") if trades: print(f"\n {'Trade Orders (execute at close)':}") print(f" {'Action':<6} {'Ticker':<8} {'Shares':>10} {'~Value':>12} {'Open':>10}") print(f" {'-'*48}") total_buy = 0 total_sell = 0 for t in trades: print(f" {t['direction']:<6} {t['ticker']:<8} {t['shares_delta']:>+10.2f} " f"${t['est_value']:>10,.2f} {t['price']:>10.2f}") if t["direction"] == "BUY": total_buy += t["est_value"] else: total_sell += t["est_value"] print(f"\n Total buys: ${total_buy:>12,.2f}") print(f" Total sells: ${total_sell:>12,.2f}") print(f" Net flow: ${total_sell - total_buy:>+12,.2f}") else: print("\n No trades needed today.") print() def cmd_evening(args): """Evening: record execution at close prices, update portfolio.""" market = args.market strategy_name = args.strategy universe = UNIVERSES[market] tickers = universe["fetch"]() benchmark = universe["benchmark"] all_tickers = sorted(set(tickers + [benchmark])) state = load_state(market, strategy_name) if not state: print("No state found. Run 'morning' first.") return if not state.get("pending_trades"): print("No pending trades. Run 'morning' first.") return pending = state["pending_trades"] trade_date = pending["date"] if state["last_evening"] == trade_date: print(f"Evening already recorded for {trade_date}.") return # Get close prices close_data = data_manager.update(market, all_tickers) tickers = [t for t in tickers if t in close_data.columns] target_date = pd.Timestamp(trade_date) all_held = list(set( list(pending.get("target_weights", {}).keys()) + list(state["holdings"].keys()) )) close_prices = get_prices_for_date(all_held, target_date, close_data) # Recompute trades at close prices for accurate execution trades = pending.get("trades", []) print(f"\n{'='*60}") print(f" EVENING EXECUTION — {trade_date}") print(f" Strategy: {strategy_name} | Market: {market.upper()}") print(f"{'='*60}") pre_value = portfolio_value(state["holdings"], close_prices, state["cash"]) print(f"\n Pre-trade value: ${pre_value:,.2f}") # Recompute trades with close prices for execution target_weights = pending.get("target_weights", {}) exec_trades = compute_trades( state["holdings"], state["cash"], target_weights, close_prices, min_trade_value=max(50, pre_value * 0.001), integer_shares=args.integer_shares ) execute_trades(state, exec_trades, close_prices, tx_cost=args.tx_cost, fixed_fee=args.fixed_fee, trade_date=trade_date, integer_shares=args.integer_shares) post_value = portfolio_value(state["holdings"], close_prices, state["cash"]) state["daily_equity"][trade_date] = round(post_value, 2) state["pending_trades"] = None state["last_evening"] = trade_date save_state(state, market, strategy_name) n_executed = len(exec_trades) total_commission = sum(t.get("commission", 0) for t in state["trade_log"][-n_executed:]) if n_executed else 0 print(f" Trades executed: {n_executed}") print(f" Total commission: ${total_commission:,.2f}") print(f" Post-trade value: ${post_value:,.2f}") print(f" Cash remaining: ${state['cash']:,.2f}") print(f" Holdings: {len(state['holdings'])} positions") if exec_trades: print(f"\n {'Executed':}") print(f" {'Action':<6} {'Ticker':<8} {'Shares':>10} {'Close':>10} {'Value':>12}") print(f" {'-'*48}") for t in exec_trades: p = close_prices.get(t["ticker"], t["price"]) v = abs(t["shares_delta"]) * p print(f" {t['direction']:<6} {t['ticker']:<8} {t['shares_delta']:>+10.2f} " f"{p:>10.2f} ${v:>10,.2f}") pnl = post_value - state["initial_capital"] pnl_pct = pnl / state["initial_capital"] * 100 print(f"\n P&L since inception: ${pnl:>+,.2f} ({pnl_pct:>+.2f}%)") print() def cmd_status(args): """Print current portfolio status.""" market = args.market strategy_name = args.strategy state = load_state(market, strategy_name) if not state: print("No state found. Run 'morning' to initialize.") return # Get latest prices universe = UNIVERSES[market] tickers = universe["fetch"]() benchmark = universe["benchmark"] all_tickers = sorted(set(tickers + [benchmark])) close_data = data_manager.update(market, all_tickers) last_date = close_data.index[-1] all_held = list(state["holdings"].keys()) prices = get_prices_for_date(all_held + [benchmark], last_date, close_data) total = portfolio_value(state["holdings"], prices, state["cash"]) pnl = total - state["initial_capital"] print(f"\n{'='*60}") print(f" PORTFOLIO STATUS") print(f" Strategy: {strategy_name} | Market: {market.upper()}") print(f" Last data: {last_date.date()}") print(f"{'='*60}") print(f"\n Initial capital: ${state['initial_capital']:>12,.2f}") print(f" Current value: ${total:>12,.2f}") print(f" Cash: ${state['cash']:>12,.2f}") print(f" P&L: ${pnl:>+12,.2f} ({pnl/state['initial_capital']*100:>+.2f}%)") print(f" Last morning: {state.get('last_morning', 'N/A')}") print(f" Last evening: {state.get('last_evening', 'N/A')}") if state["holdings"]: print(f"\n {'Holdings':}") print(f" {'Ticker':<8} {'Shares':>10} {'Price':>10} {'Value':>12} {'Weight':>8}") print(f" {'-'*52}") stock_value = total - state["cash"] for ticker, shares in sorted(state["holdings"].items(), key=lambda x: -x[1] * prices.get(x[0], 0)): p = prices.get(ticker, 0) v = shares * p w = v / total if total > 0 else 0 print(f" {ticker:<8} {shares:>10.2f} {p:>10.2f} ${v:>10,.2f} {w:>7.1%}") print(f" {'Cash':<8} {'':>10} {'':>10} ${state['cash']:>10,.2f} " f"{state['cash']/total*100 if total > 0 else 0:>6.1f}%") else: print("\n No holdings (100% cash)") # Equity curve if state.get("daily_equity"): eq = state["daily_equity"] dates = sorted(eq.keys()) print(f"\n {'Equity History (last 10 days)':}") print(f" {'Date':<12} {'Value':>12} {'Daily':>8}") print(f" {'-'*34}") for i, d in enumerate(dates[-10:]): v = eq[d] prev = eq[dates[dates.index(d) - 1]] if dates.index(d) > 0 else state["initial_capital"] daily_ret = (v / prev - 1) * 100 print(f" {d:<12} ${v:>10,.2f} {daily_ret:>+7.2f}%") # Recent trades log = state.get("trade_log", []) if log: recent = log[-15:] print(f"\n {'Recent Trades (last 15)':}") print(f" {'Date':<12} {'Action':<6} {'Ticker':<8} {'Shares':>8} {'Price':>10} {'Value':>10}") print(f" {'-'*58}") for t in recent: print(f" {t['date']:<12} {t['action']:<6} {t['ticker']:<8} " f"{t['shares']:>8.2f} {t['price']:>10.2f} ${t['value']:>8,.2f}") print() def cmd_compare(args): """Compare multiple strategies side-by-side.""" import glob as _glob market = args.market strategy_names = getattr(args, "strategy", None) # Auto-discover: find all trader_{market}_*.json state files if not strategy_names: pattern = os.path.join("data", f"trader_{market}_*.json") files = sorted(_glob.glob(pattern)) strategy_names = [] for f in files: # Extract strategy name: trader_{market}_{name}.json base = os.path.basename(f) # trader_us_foo.json prefix = f"trader_{market}_" name = base[len(prefix):-len(".json")] # Skip sim_ prefixed (historical simulations) unless no live ones strategy_names.append(name) if not strategy_names: print(f"No state files found for market '{market}' in data/.") print(f"Run 'monitor' or 'auto' first to generate state.") return print(f" Auto-discovered {len(strategy_names)} strategies: " f"{', '.join(strategy_names)}") # Load all states states = {} for name in strategy_names: state = load_state(market, name) if state and state.get("daily_equity"): states[name] = state else: print(f" Warning: no data for '{name}' — skipping") if not states: print("No strategies with data found.") return # Find common date range all_dates = set() for state in states.values(): all_dates.update(state["daily_equity"].keys()) all_dates = sorted(all_dates) if not all_dates: print("No equity data found.") return print(f"\n{'='*80}") print(f" STRATEGY COMPARISON — {market.upper()}") print(f" Period: {all_dates[0]} to {all_dates[-1]} ({len(all_dates)} days)") print(f"{'='*80}") # Summary table results = [] for name, state in states.items(): eq = state["daily_equity"] dates = sorted(eq.keys()) if len(dates) < 2: continue initial = state["initial_capital"] final = eq[dates[-1]] n_days = len(dates) total_ret = (final / initial - 1) * 100 ann_ret = ((final / initial) ** (252 / n_days) - 1) * 100 if n_days > 0 else 0 # Max drawdown values = [eq[d] for d in dates] peak = values[0] max_dd = 0 for v in values: peak = max(peak, v) dd = (v - peak) / peak max_dd = min(max_dd, dd) # Sharpe daily_rets = [] for i in range(1, len(values)): daily_rets.append(values[i] / values[i - 1] - 1) daily_rets = np.array(daily_rets) if daily_rets else np.array([0]) sharpe = (daily_rets.mean() / daily_rets.std() * np.sqrt(252)) if daily_rets.std() > 0 else 0 n_trades = len(state.get("trade_log", [])) total_comm = sum(t.get("commission", 0) for t in state.get("trade_log", [])) results.append({ "name": name, "initial": initial, "final": final, "total_ret": total_ret, "ann_ret": ann_ret, "sharpe": sharpe, "max_dd": max_dd * 100, "n_trades": n_trades, "commission": total_comm, "n_days": n_days, "n_positions": len(state.get("holdings", {})), "cash": state.get("cash", 0), }) # Sort by total return descending results.sort(key=lambda r: -r["total_ret"]) # Print ranking print(f"\n {'#':<3} {'Strategy':<25} {'Return':>9} {'Ann.Ret':>9} " f"{'Sharpe':>8} {'MaxDD':>8} {'Trades':>7} {'Final':>14}") print(f" {'─'*87}") for i, r in enumerate(results, 1): print(f" {i:<3} {r['name']:<25} {r['total_ret']:>+8.1f}% {r['ann_ret']:>+8.1f}% " f"{r['sharpe']:>8.2f} {r['max_dd']:>7.1f}% {r['n_trades']:>7} " f"${r['final']:>12,.2f}") # Equity curve comparison (last 20 dates) recent_dates = all_dates[-20:] print(f"\n Equity Curve (last {len(recent_dates)} days):") header = f" {'Date':<12}" for r in results: header += f" {r['name'][:14]:>14}" print(header) print(f" {'─' * (12 + 15 * len(results))}") for d in recent_dates: row = f" {d:<12}" for r in results: eq = states[r["name"]]["daily_equity"] if d in eq: row += f" ${eq[d]:>12,.2f}" else: row += f" {'—':>14}" print(row) # Winner summary if results: best = results[0] worst = results[-1] print(f"\n Best: {best['name']} ({best['total_ret']:>+.1f}%)") if len(results) > 1: print(f" Worst: {worst['name']} ({worst['total_ret']:>+.1f}%)") print(f" Spread: {best['total_ret'] - worst['total_ret']:.1f}pp") print() def cmd_log(args): """Print the daily log: each day's holdings, cash, operations.""" market = args.market strategy_name = args.strategy state = load_state(market, strategy_name) if not state: print("No state found. Run 'simulate' or 'auto' first.") return daily_log = state.get("daily_log", []) if not daily_log: print("No daily log entries. Re-run simulate with the latest code to generate logs.") return # Optional date filter start = args.start if hasattr(args, "start") and args.start else None end = args.end if hasattr(args, "end") and args.end else None filtered = daily_log if start: filtered = [d for d in filtered if d["date"] >= start] if end: filtered = [d for d in filtered if d["date"] <= end] if not filtered: print(f"No log entries in range {start or '...'} to {end or '...'}.") return print(f"\n{'='*80}") print(f" DAILY LOG — {strategy_name} | {market.upper()}") print(f" {filtered[0]['date']} to {filtered[-1]['date']} ({len(filtered)} days)") print(f"{'='*80}") for entry in filtered: d = entry["date"] eq = entry["equity"] cash = entry["cash"] ret = entry["daily_return_pct"] ops = entry.get("operations", []) holds = entry.get("holdings", {}) print(f"\n┌─ {d} equity: ${eq:>10,.2f} daily: {ret:>+.2f}% " f"cash: ${cash:>10,.2f} positions: {entry['n_positions']}") # Holdings if holds: print(f"│ {'Ticker':<8} {'Shares':>8} {'Price':>10} {'Value':>12} {'Weight':>8}") print(f"│ {'─'*50}") for ticker in sorted(holds, key=lambda t: -holds[t]["value"]): h = holds[ticker] w = h["value"] / eq * 100 if eq > 0 else 0 print(f"│ {ticker:<8} {h['shares']:>8} {h['price']:>10.2f} " f"${h['value']:>10,.2f} {w:>7.1f}%") cash_w = cash / eq * 100 if eq > 0 else 0 print(f"│ {'Cash':<8} {'':>8} {'':>10} ${cash:>10,.2f} {cash_w:>7.1f}%") # Operations if ops: print(f"│") print(f"│ Operations:") print(f"│ {'Action':<6} {'Ticker':<8} {'Shares':>8} {'Price':>10} {'Value':>12}") print(f"│ {'─'*46}") for op in ops: sign = "+" if op["action"] == "BUY" else "-" print(f"│ {op['action']:<6} {op['ticker']:<8} {sign}{op['shares']:>7} " f"{op['price']:>10.2f} ${op['value']:>10,.2f}") else: print(f"│ (no trades)") print(f"└{'─'*79}") # Summary first = filtered[0] last = filtered[-1] total_ops = sum(e["n_trades"] for e in filtered) trade_days = sum(1 for e in filtered if e["n_trades"] > 0) print(f"\n Period: {first['date']} → {last['date']}") print(f" Start equity: ${first['equity']:>12,.2f}") print(f" End equity: ${last['equity']:>12,.2f}") print(f" Return: {(last['equity']/first['equity']-1)*100:>+11.2f}%") print(f" Total trades: {total_ops:>11}") print(f" Trade days: {trade_days:>11} / {len(filtered)}") print() def cmd_simulate(args): """Simulate day-by-day over a date range.""" market = args.market strategy_name = args.strategy universe = UNIVERSES[market] tickers = universe["fetch"]() benchmark = universe["benchmark"] all_tickers = sorted(set(tickers + [benchmark])) # Load both open and close data close_data, open_data = data_manager.update(market, all_tickers, with_open=True) tickers = [t for t in tickers if t in close_data.columns] # Date range start = pd.Timestamp(args.start) end = pd.Timestamp(args.end) trading_days = close_data.index[(close_data.index >= start) & (close_data.index <= end)] if len(trading_days) == 0: print(f"No trading days found between {args.start} and {args.end}") return # Fresh state for simulation state = init_state(market, f"sim_{strategy_name}", args.capital) # Pre-compute strategy weights for the full period (much faster than per-day) # Append dummy row so shift(1) inside strategy produces valid last-day signal strategy = STRATEGY_REGISTRY[strategy_name](top_n=max(5, len(tickers) // 10)) open_tickers = open_data[tickers] dummy_date = open_tickers.index[-1] + pd.Timedelta(days=1) dummy_row = pd.DataFrame( [open_tickers.iloc[-1].values], index=[dummy_date], columns=open_tickers.columns ) extended = pd.concat([open_tickers, dummy_row]) raw_weights = strategy.generate_signals(extended) # Strategy's shift(1): raw_weights[t] uses data up to t-1 # For date t, the signal using open[t] is in raw_weights[t+1] # So shift(-1) to align: full_weights[t] = signal using open[t] full_weights = raw_weights.shift(-1).fillna(0.0) print(f"\n{'='*70}") print(f" SIMULATION: {strategy_name} | {market.upper()}") print(f" Period: {trading_days[0].date()} to {trading_days[-1].date()} " f"({len(trading_days)} trading days)") print(f" Capital: ${args.capital:,.0f}") print(f"{'='*70}\n") total_trades = 0 total_commission = 0.0 for i, day in enumerate(trading_days): day_str = str(day.date()) # Morning: get target weights from pre-computed matrix if day in full_weights.index: row = full_weights.loc[day] target_weights = {t: float(w) for t, w in row.items() if w > 1e-6} else: target_weights = {} # Get open and close prices all_held = list(set(list(target_weights.keys()) + list(state["holdings"].keys()))) open_prices = get_prices_for_date(all_held, day, open_data) close_prices = get_prices_for_date(all_held + [benchmark], day, close_data) if not close_prices: continue # Compute portfolio value before trades (at close, reflecting overnight changes) pre_value = portfolio_value(state["holdings"], close_prices, state["cash"]) # Compute and execute trades at close prices trades = compute_trades( state["holdings"], state["cash"], target_weights, close_prices, min_trade_value=max(50, pre_value * 0.001), integer_shares=args.integer_shares ) execute_trades(state, trades, close_prices, tx_cost=args.tx_cost, fixed_fee=args.fixed_fee, trade_date=day_str, integer_shares=args.integer_shares) post_value = portfolio_value(state["holdings"], close_prices, state["cash"]) state["daily_equity"][day_str] = round(post_value, 2) # Record daily snapshot (holdings + operations) prev_eq_val = args.capital if i == 0 else list(state["daily_equity"].values())[-2] if len(state["daily_equity"]) >= 2 else args.capital record_daily_snapshot(state, day_str, close_prices, trades, prev_eq_val) day_trades = len(trades) day_commission = sum( t.get("commission", 0) for t in state["trade_log"][-day_trades:] ) if day_trades else 0 total_trades += day_trades total_commission += day_commission # Progress: print weekly or on trade days if day_trades > 0 or i == 0 or i == len(trading_days) - 1: prev_date = dates[-1] if (dates := sorted(state["daily_equity"].keys())[:-1]) else None prev_val = state["daily_equity"].get(prev_date, args.capital) if prev_date else args.capital daily_ret = (post_value / prev_val - 1) * 100 if prev_val > 0 else 0 print(f" {day_str} ${post_value:>12,.2f} {daily_ret:>+7.2f}% " f"trades: {day_trades:>2} positions: {len(state['holdings']):>2}") # Final summary equity = state["daily_equity"] dates = sorted(equity.keys()) final = equity[dates[-1]] initial = args.capital total_ret = (final / initial - 1) * 100 n_days = len(dates) ann_ret = ((final / initial) ** (252 / n_days) - 1) * 100 if n_days > 0 else 0 # Max drawdown values = [equity[d] for d in dates] peak = values[0] max_dd = 0 for v in values: peak = max(peak, v) dd = (v - peak) / peak max_dd = min(max_dd, dd) # Daily returns for Sharpe daily_rets = [] for i in range(1, len(values)): daily_rets.append(values[i] / values[i - 1] - 1) daily_rets = np.array(daily_rets) if daily_rets else np.array([0]) sharpe = (daily_rets.mean() / daily_rets.std() * np.sqrt(252)) if daily_rets.std() > 0 else 0 print(f"\n{'='*70}") print(f" SIMULATION RESULTS") print(f"{'='*70}") print(f" Initial capital: ${initial:>12,.2f}") print(f" Final value: ${final:>12,.2f}") print(f" Total return: {total_ret:>+11.2f}%") print(f" Annualized return: {ann_ret:>+11.2f}%") print(f" Sharpe ratio: {sharpe:>11.2f}") print(f" Max drawdown: {max_dd * 100:>11.2f}%") print(f" Total trades: {total_trades:>11}") print(f" Total commission: ${total_commission:>11,.2f}") print(f" Trading days: {n_days:>11}") # Save state for review state["strategy"] = strategy_name state["market"] = market save_state(state, market, f"sim_{strategy_name}") print(f"\n State saved to: {_state_path(market, f'sim_{strategy_name}')}") # Benchmark comparison bench_start = close_data[benchmark].loc[trading_days[0]] bench_end = close_data[benchmark].loc[trading_days[-1]] bench_ret = (bench_end / bench_start - 1) * 100 print(f"\n Benchmark ({benchmark}): {bench_ret:>+.2f}%") print(f" Alpha: {total_ret - bench_ret:>+.2f}%") print() def cmd_monitor(args): """ Long-running monitor — runs in a tmux session, automatically executes daily. Manages ALL markets (US + CN) in a single process. Each market has its own timezone and two daily phases (morning + evening). All strategies run for every market. Usage: tmux new -s quant uv run python trader.py monitor # Ctrl-B D to detach """ import copy import time as _time import zoneinfo markets = args.market # list of markets strategies = args.strategy # list of strategy names # Market schedule configuration — two phases per day MARKET_CONFIG = { "us": { "tz": zoneinfo.ZoneInfo("America/New_York"), "open_hour": 9, "open_min": 30, "open_buffer": 15, # run at 9:45 AM ET "close_hour": 16, "close_min": 0, "close_buffer": 35, # run at 4:35 PM ET "label": "US (NYSE/NASDAQ)", }, "cn": { "tz": zoneinfo.ZoneInfo("Asia/Shanghai"), "open_hour": 9, "open_min": 30, "open_buffer": 15, # run at 9:45 AM CST "close_hour": 15, "close_min": 0, "close_buffer": 35, # run at 3:35 PM CST "label": "CN (SSE/SZSE)", }, } # Compute run times for each market market_schedules = {} for mkt in markets: cfg = MARKET_CONFIG[mkt] tz = cfg["tz"] morn_h = cfg["open_hour"] morn_m = cfg["open_min"] + cfg["open_buffer"] if morn_m >= 60: morn_h += morn_m // 60 morn_m = morn_m % 60 eve_h = cfg["close_hour"] eve_m = cfg["close_min"] + cfg["close_buffer"] if eve_m >= 60: eve_h += eve_m // 60 eve_m = eve_m % 60 market_schedules[mkt] = { "tz": tz, "label": cfg["label"], "morn_h": morn_h, "morn_m": morn_m, "eve_h": eve_h, "eve_m": eve_m, } # Banner print(f"\n{'='*60}") print(f" MONITOR MODE — {len(markets)} market(s), " f"{len(strategies)} strategies each") print(f" Capital: ${args.capital:,.0f} | " f"Fee: ${args.fixed_fee:.2f}/trade | " f"Integer shares: {args.integer_shares}") for mkt, sched in market_schedules.items(): print(f" {sched['label']}:") print(f" Morning: {sched['morn_h']:02d}:{sched['morn_m']:02d} {sched['tz']}") print(f" Evening: {sched['eve_h']:02d}:{sched['eve_m']:02d} {sched['tz']}") print(f" Strategies: {', '.join(strategies)}") print(f"{'='*60}") # Use UTC as common reference for sleeping utc = zoneinfo.ZoneInfo("UTC") print(f"[monitor] Started at {datetime.now(utc).strftime('%Y-%m-%d %H:%M:%S UTC')}") print(f"[monitor] Press Ctrl+C to stop\n") def _next_events_for_market(mkt, now_utc): """Return list of (utc_datetime, market, phase) for next events.""" sched = market_schedules[mkt] tz = sched["tz"] now_local = now_utc.astimezone(tz) candidates = [] for phase, h, m in [("morning", sched["morn_h"], sched["morn_m"]), ("evening", sched["eve_h"], sched["eve_m"])]: target = now_local.replace(hour=h, minute=m, second=0, microsecond=0) if now_local >= target: target += timedelta(days=1) # Skip weekends while target.weekday() >= 5: target += timedelta(days=1) candidates.append((target.astimezone(utc), mkt, phase)) return candidates def _next_event(now_utc): """Find the globally next event across all markets.""" all_candidates = [] for mkt in markets: all_candidates.extend(_next_events_for_market(mkt, now_utc)) return min(all_candidates, key=lambda x: x[0]) def _run_phase(market, phase, now_utc): """Run all strategies for a market/phase.""" sched = market_schedules[market] tz = sched["tz"] now_local = now_utc.astimezone(tz) print(f"\n[monitor] {'='*55}") print(f"[monitor] {market.upper()} {phase.upper()} at " f"{now_local.strftime('%Y-%m-%d %H:%M:%S %Z')}") print(f"[monitor] {'='*55}") for strat_name in strategies: sub_args = copy.copy(args) sub_args.strategy = strat_name sub_args.market = market print(f"\n[monitor] --- {market.upper()}:{strat_name} ---") try: if phase == "morning": cmd_morning(sub_args) else: state = load_state(market, strat_name) today_str = now_local.strftime("%Y-%m-%d") if (state and state.get("last_morning") == today_str and state.get("pending_trades")): cmd_evening(sub_args) else: print(f"[monitor] Morning not run for " f"{market.upper()}:{strat_name} — using auto") cmd_auto(sub_args) print(f"[monitor] {market.upper()}:{strat_name} {phase} OK") except KeyboardInterrupt: raise except Exception as e: print(f"[monitor] ERROR {market.upper()}:{strat_name} " f"{phase}: {e}") import traceback traceback.print_exc() print(f"[monitor] {market.upper()} {phase} done — " f"{len(strategies)} strategies") while True: now_utc = datetime.now(utc) target_utc, next_mkt, next_phase = _next_event(now_utc) wait_seconds = (target_utc - now_utc).total_seconds() next_tz = market_schedules[next_mkt]["tz"] target_local = target_utc.astimezone(next_tz) print(f"[monitor] {now_utc.strftime('%Y-%m-%d %H:%M UTC')} — " f"Next: {next_mkt.upper()} {next_phase.upper()} at " f"{target_local.strftime('%H:%M %Z %m/%d')} " f"(in {wait_seconds/3600:.1f}h)") # Sleep in chunks while True: now_utc = datetime.now(utc) remaining = (target_utc - now_utc).total_seconds() if remaining <= 0: break chunk = min(remaining, 900) _time.sleep(chunk) now_utc = datetime.now(utc) remaining = (target_utc - now_utc).total_seconds() if remaining > 60: hours_left = remaining / 3600 if int(remaining) % 3600 < 900: print(f"[monitor] {now_utc.strftime('%H:%M UTC')} — " f"waiting... ({hours_left:.1f}h until " f"{next_mkt.upper()} {next_phase})") # Execute now_utc = datetime.now(utc) # Run all events that are due NOW (within 2 min window) — handles # case where US evening and CN morning overlap for mkt in markets: for phase_check in ("morning", "evening"): sched = market_schedules[mkt] tz = sched["tz"] now_local = now_utc.astimezone(tz) if phase_check == "morning": h, m = sched["morn_h"], sched["morn_m"] else: h, m = sched["eve_h"], sched["eve_m"] target_local = now_local.replace( hour=h, minute=m, second=0, microsecond=0) diff = abs((now_local - target_local).total_seconds()) # Fire if within 2-minute window and it's a weekday if diff < 120 and now_local.weekday() < 5: try: _run_phase(mkt, phase_check, now_utc) except KeyboardInterrupt: raise except Exception as e: print(f"[monitor] ERROR in {mkt} {phase_check}: {e}") import traceback traceback.print_exc() print() def cmd_auto(args): """ Automated daily run — single invocation handles both morning + evening. Designed for cron/systemd: run once after market close each trading day. Downloads open + close prices for today, generates signals, executes, records. Usage with cron (US market, run at 5pm ET weekdays): 0 17 * * 1-5 cd /path/to/quant && uv run python trader.py auto --market us For CN market (run at 4pm CST): 0 16 * * 1-5 cd /path/to/quant && uv run python trader.py auto --market cn """ import time market = args.market strategy_name = args.strategy universe = UNIVERSES[market] tickers = universe["fetch"]() benchmark = universe["benchmark"] all_tickers = sorted(set(tickers + [benchmark])) # Load or init state state = load_state(market, strategy_name) if not state: state = init_state(market, strategy_name, args.capital) print(f"[auto] Initialized new portfolio: ${args.capital:,.0f} cash") # Download data (close + open) close_data, open_data = data_manager.update(market, all_tickers, with_open=True) tickers = [t for t in tickers if t in close_data.columns] today = close_data.index[-1] today_str = str(today.date()) # Skip if already processed today if state.get("last_evening") == today_str: print(f"[auto] {today_str} already processed. Nothing to do.") return print(f"\n[auto] {'='*55}") print(f"[auto] {today_str} | {strategy_name} | {market.upper()}") print(f"[auto] {'='*55}") # --- MORNING PHASE: Generate target weights from open prices --- strategy = STRATEGY_REGISTRY[strategy_name](top_n=max(5, len(tickers) // 10)) target_weights = generate_target_weights(strategy, open_data[tickers], today) all_held = list(set(list(target_weights.keys()) + list(state["holdings"].keys()))) close_prices = get_prices_for_date(all_held + [benchmark], today, close_data) if not close_prices: print(f"[auto] No close prices for {today_str}. Market likely closed.") return pre_value = portfolio_value(state["holdings"], close_prices, state["cash"]) print(f"[auto] Pre-trade value: ${pre_value:,.2f}") print(f"[auto] Target positions: {len(target_weights)}") # --- EVENING PHASE: Execute trades at close prices --- trades = compute_trades( state["holdings"], state["cash"], target_weights, close_prices, min_trade_value=max(50, pre_value * 0.001), integer_shares=args.integer_shares ) execute_trades(state, trades, close_prices, tx_cost=args.tx_cost, fixed_fee=args.fixed_fee, trade_date=today_str, integer_shares=args.integer_shares) post_value = portfolio_value(state["holdings"], close_prices, state["cash"]) state["daily_equity"][today_str] = round(post_value, 2) # Record daily snapshot eq_vals = list(state["daily_equity"].values()) prev_eq = eq_vals[-2] if len(eq_vals) >= 2 else state["initial_capital"] record_daily_snapshot(state, today_str, close_prices, trades, prev_eq) state["last_morning"] = today_str state["last_evening"] = today_str state["pending_trades"] = None save_state(state, market, strategy_name) # Print summary pnl = post_value - state["initial_capital"] pnl_pct = pnl / state["initial_capital"] * 100 n_trades = len(trades) commission = sum(t.get("commission", 0) for t in state["trade_log"][-n_trades:]) if n_trades else 0 print(f"[auto] Trades: {n_trades} | Commission: ${commission:,.2f}") print(f"[auto] Post-trade value: ${post_value:,.2f} | Cash: ${state['cash']:,.2f}") print(f"[auto] P&L: ${pnl:>+,.2f} ({pnl_pct:>+.2f}%)") print(f"[auto] Holdings: {len(state['holdings'])} positions") if trades: for t in trades: p = close_prices.get(t["ticker"], t["price"]) print(f"[auto] {t['direction']:<4} {t['ticker']:<8} {t['shares_delta']:>+10.2f} @ {p:.2f}") # Benchmark bench_eq = state["daily_equity"] dates = sorted(bench_eq.keys()) if len(dates) >= 2: prev_val = bench_eq[dates[-2]] daily_ret = (post_value / prev_val - 1) * 100 print(f"[auto] Daily return: {daily_ret:>+.2f}%") print(f"[auto] State saved: {_state_path(market, strategy_name)}") print() # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Daily trading simulation") sub = parser.add_subparsers(dest="command", required=True) # Common args def add_common(p): p.add_argument("--market", choices=UNIVERSES.keys(), default="us") p.add_argument("--strategy", choices=STRATEGY_REGISTRY.keys(), default="recovery_mom_top10") p.add_argument("--capital", type=float, default=100_000) p.add_argument("--tx-cost", type=float, default=0.001, help="Proportional transaction cost (default: 0.001 = 10bps)") p.add_argument("--fixed-fee", type=float, default=0.0, help="Fixed dollar fee per trade") p.add_argument("--integer-shares", action="store_true", default=False, help="Only trade whole shares (no fractional)") # Morning p_morning = sub.add_parser("morning", help="Generate trade orders from open prices") add_common(p_morning) # Evening p_evening = sub.add_parser("evening", help="Record execution at close prices") add_common(p_evening) # Auto (single daily run — for cron/systemd) p_auto = sub.add_parser("auto", help="Automated daily run: signal + execute in one step (for cron)") add_common(p_auto) # Monitor (long-running daemon for tmux) — all markets + all strategies p_monitor = sub.add_parser("monitor", help="Long-running daemon: runs ALL markets & strategies (for tmux)") p_monitor.add_argument("--market", nargs="+", choices=list(UNIVERSES.keys()), default=list(UNIVERSES.keys()), help="Markets to monitor (default: ALL)") p_monitor.add_argument("--strategy", nargs="+", choices=list(STRATEGY_REGISTRY.keys()), default=list(STRATEGY_REGISTRY.keys()), help="Strategies to run (default: ALL)") p_monitor.add_argument("--capital", type=float, default=10_000) p_monitor.add_argument("--tx-cost", type=float, default=0.001, help="Proportional transaction cost (default: 0.001 = 10bps)") p_monitor.add_argument("--fixed-fee", type=float, default=2.0, help="Fixed dollar fee per trade (default: $2)") p_monitor.add_argument("--integer-shares", action="store_true", default=True, help="Only trade whole shares (default: True)") # Status (strategy is a free-form string to allow sim_ prefixed names) p_status = sub.add_parser("status", help="Show current portfolio") p_status.add_argument("--market", choices=UNIVERSES.keys(), default="us") p_status.add_argument("--strategy", default="recovery_mom_top10", help="Strategy name (or sim_ for simulation state)") p_status.add_argument("--capital", type=float, default=100_000) p_status.add_argument("--tx-cost", type=float, default=0.001) p_status.add_argument("--fixed-fee", type=float, default=0.0) p_status.add_argument("--integer-shares", action="store_true", default=False) # Simulate p_sim = sub.add_parser("simulate", help="Simulate over a date range") add_common(p_sim) p_sim.add_argument("--start", required=True, help="Start date (YYYY-MM-DD)") p_sim.add_argument("--end", required=True, help="End date (YYYY-MM-DD)") # Log viewer p_log = sub.add_parser("log", help="View daily log (holdings + operations per day)") p_log.add_argument("--market", choices=UNIVERSES.keys(), default="us") p_log.add_argument("--strategy", default="sim_recovery_mom_top10", help="Strategy name (e.g. sim_recovery_mom_top10)") p_log.add_argument("--start", default=None, help="Start date filter (YYYY-MM-DD)") p_log.add_argument("--end", default=None, help="End date filter (YYYY-MM-DD)") # Compare strategies p_cmp = sub.add_parser("compare", help="Compare strategies side-by-side (auto-discovers all if no --strategy)") p_cmp.add_argument("--market", choices=UNIVERSES.keys(), default="us") p_cmp.add_argument("--strategy", nargs="+", default=None, help="Strategy names to compare (default: auto-discover all)") args = parser.parse_args() if args.command == "morning": cmd_morning(args) elif args.command == "evening": cmd_evening(args) elif args.command == "auto": cmd_auto(args) elif args.command == "monitor": cmd_monitor(args) elif args.command == "status": cmd_status(args) elif args.command == "simulate": cmd_simulate(args) elif args.command == "log": cmd_log(args) elif args.command == "compare": cmd_compare(args) if __name__ == "__main__": main()