Two-phase monitor, integer shares, and daily log

- Monitor now runs morning (9:45 → signals) and evening (4:35 → execute)
  instead of single daily run; falls back to auto if morning missed
- Add --integer-shares flag for whole-share-only trading (no fractional)
- Add daily_log to state: each day records holdings, cash, and operations
- Add 'log' subcommand to view daily snapshots with date range filter
- record_daily_snapshot() called from both simulate and auto commands

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-06 23:57:03 +08:00
parent 4709271504
commit 942164a38f
2 changed files with 323 additions and 69 deletions

View File

@@ -23,8 +23,11 @@ uv run python main.py --execution open-close # Signal on open, execute at close
uv run python trader.py auto --market us --strategy recovery_mom_top10 # Single daily run (for cron)
uv run python trader.py morning --market us --strategy recovery_mom_top10 # Morning: generate orders
uv run python trader.py evening --market us --strategy recovery_mom_top10 # Evening: record execution
uv run python trader.py monitor --market us --strategy recovery_mom_top10 # Long-running daemon (for tmux)
uv run python trader.py status --market us --strategy recovery_mom_top10 # Portfolio status
uv run python trader.py simulate --market us --strategy recovery_mom_top10 --start 2026-01-01 --end 2026-04-01 # Historical replay
uv run python trader.py log --market us --strategy sim_recovery_mom_top10 --start 2026-03-01 # View daily log
uv run python trader.py simulate --integer-shares --fixed-fee 2.0 ... # Integer shares mode
# Setup
uv sync # Install/sync dependencies
@@ -66,7 +69,21 @@ No test suite or linter is configured.
- Python 3.12+, managed with `uv`.
- Do not show matplotlib figures when running backtests; use `--no-plot`.
## Server Deployment (Cron)
## Server Deployment
### Option 1: tmux monitor (recommended)
Two-phase daily schedule — morning (open prices → generate signals) and evening (close prices → execute trades):
```bash
tmux new -s quant
uv run python trader.py monitor --market us --strategy recovery_mom_top10
# Ctrl-B D to detach; tmux attach -t quant to reconnect
```
If monitor starts mid-day (after open, before close), it automatically falls back to `auto` mode for the evening phase.
### Option 2: Cron
Run daily after market close. The `auto` command is idempotent — safe to re-run:

373
trader.py
View File

@@ -8,6 +8,7 @@ Subcommands:
monitor — Long-running daemon that auto-runs daily after market close (for tmux)
status — Print current portfolio, P&L, and recent trades
simulate — Replay a date range day-by-day (for forward testing)
log — View daily log: each day's holdings, cash, and operations
Usage:
uv run python trader.py morning --market us --strategy recovery_mom_top10
@@ -95,6 +96,7 @@ def init_state(market: str, strategy_name: str, capital: float) -> dict:
"pending_trades": None, # set by morning, consumed by evening
"trade_log": [], # list of trade dicts
"daily_equity": {}, # date_str -> portfolio value
"daily_log": [], # list of per-day snapshots (holdings, cash, trades)
"last_morning": None,
"last_evening": None,
}
@@ -150,14 +152,19 @@ def generate_target_weights(strategy, open_data: pd.DataFrame, target_date) -> d
def compute_trades(holdings: dict, cash: float, target_weights: dict,
prices: dict, min_trade_value: float = 50.0) -> list[dict]:
prices: dict, min_trade_value: float = 50.0,
integer_shares: bool = False) -> list[dict]:
"""
Compute trades needed to move from current holdings to target weights.
If integer_shares=True, share deltas are rounded to whole numbers and
buys are capped so they don't exceed available cash (accounting for
commissions). Sells are processed first to free up cash.
Returns list of {ticker, shares_delta, direction, est_value}.
"""
total = portfolio_value(holdings, prices, cash)
trades = []
raw = []
all_tickers = set(list(target_weights.keys()) + list(holdings.keys()))
for ticker in sorted(all_tickers):
@@ -171,31 +178,49 @@ def compute_trades(holdings: dict, cash: float, target_weights: dict,
current_shares = holdings.get(ticker, 0.0)
delta = target_shares - current_shares
if integer_shares:
# Round toward zero for buys (floor), away from zero for sells (floor of abs)
if delta > 0:
delta = int(delta) # floor: don't overshoot cash
else:
delta = -int(abs(delta)) # floor of magnitude: don't over-sell
trade_value = abs(delta * price)
if trade_value < min_trade_value:
continue
if integer_shares and abs(delta) < 1:
continue
trades.append({
raw.append({
"ticker": ticker,
"shares_delta": round(delta, 4),
"shares_delta": int(delta) if integer_shares else round(delta, 4),
"direction": "BUY" if delta > 0 else "SELL",
"est_value": round(trade_value, 2),
"price": round(price, 2),
"target_shares": round(target_shares, 4),
"current_shares": round(current_shares, 4),
"target_shares": int(round(target_shares)) if integer_shares else round(target_shares, 4),
"current_shares": int(current_shares) if integer_shares else round(current_shares, 4),
})
return trades
return raw
def execute_trades(state: dict, trades: list[dict], prices: dict,
tx_cost: float = 0.001, fixed_fee: float = 0.0,
trade_date: str = "") -> None:
"""Execute trades: update holdings and cash in state, append to trade_log."""
trade_date: str = "", integer_shares: bool = False) -> None:
"""Execute trades: update holdings and cash in state, append to trade_log.
When integer_shares=True, sells are executed first to free up cash,
then buys are executed only if sufficient cash is available.
"""
holdings = state["holdings"]
cash = state["cash"]
for trade in trades:
# When using integer shares, execute sells first to free cash for buys
sells = [t for t in trades if t["shares_delta"] < 0]
buys = [t for t in trades if t["shares_delta"] > 0]
ordered = sells + buys if integer_shares else trades
for trade in ordered:
ticker = trade["ticker"]
delta = trade["shares_delta"]
price = prices.get(ticker, trade["price"])
@@ -203,7 +228,15 @@ def execute_trades(state: dict, trades: list[dict], prices: dict,
commission = cost * tx_cost + fixed_fee
if delta > 0:
# BUY
# BUY — skip if insufficient cash in integer mode
if integer_shares and (cost + commission) > cash:
# Try buying fewer shares that we can afford
affordable = int((cash - fixed_fee) / (price * (1 + tx_cost)))
if affordable < 1:
continue
delta = affordable
cost = abs(delta * price)
commission = cost * tx_cost + fixed_fee
cash -= (cost + commission)
holdings[ticker] = holdings.get(ticker, 0.0) + delta
else:
@@ -212,21 +245,74 @@ def execute_trades(state: dict, trades: list[dict], prices: dict,
holdings[ticker] = holdings.get(ticker, 0.0) + delta # delta is negative
# Remove zero holdings
if ticker in holdings and abs(holdings[ticker]) < 0.001:
if ticker in holdings and abs(holdings[ticker]) < (0.5 if integer_shares else 0.001):
del holdings[ticker]
state["trade_log"].append({
"date": trade_date,
"action": trade["direction"],
"action": "BUY" if delta > 0 else "SELL",
"ticker": ticker,
"shares": round(abs(delta), 4),
"shares": abs(delta) if integer_shares else round(abs(delta), 4),
"price": round(price, 2),
"value": round(cost, 2),
"commission": round(commission, 2),
})
state["cash"] = round(cash, 2)
state["holdings"] = {k: round(v, 4) for k, v in holdings.items() if abs(v) >= 0.001}
if integer_shares:
state["holdings"] = {k: int(round(v)) for k, v in holdings.items() if abs(v) >= 0.5}
else:
state["holdings"] = {k: round(v, 4) for k, v in holdings.items() if abs(v) >= 0.001}
def record_daily_snapshot(state: dict, date_str: str, prices: dict,
trades: list[dict], prev_equity: float) -> None:
"""Append a daily snapshot to state['daily_log'].
Each entry captures the full picture: date, holdings with prices/values,
cash, total equity, daily return, and today's operations.
"""
holdings = state["holdings"]
cash = state["cash"]
total = portfolio_value(holdings, prices, cash)
daily_ret = (total / prev_equity - 1) * 100 if prev_equity > 0 else 0.0
# Holdings detail: ticker -> {shares, price, value}
positions = {}
for ticker, shares in sorted(holdings.items()):
p = prices.get(ticker, 0.0)
positions[ticker] = {
"shares": shares,
"price": round(p, 2),
"value": round(shares * p, 2),
}
# Operations: list of {action, ticker, shares, price, value, commission}
operations = []
for t in trades:
p = prices.get(t["ticker"], t["price"])
operations.append({
"action": t["direction"],
"ticker": t["ticker"],
"shares": abs(t["shares_delta"]),
"price": round(p, 2),
"value": round(abs(t["shares_delta"]) * p, 2),
})
entry = {
"date": date_str,
"equity": round(total, 2),
"cash": round(cash, 2),
"daily_return_pct": round(daily_ret, 2),
"n_positions": len(holdings),
"n_trades": len(trades),
"holdings": positions,
"operations": operations,
}
if "daily_log" not in state:
state["daily_log"] = []
state["daily_log"].append(entry)
def get_prices_for_date(tickers: list[str], date_idx, price_df: pd.DataFrame) -> dict:
@@ -288,7 +374,8 @@ def cmd_morning(args):
total = portfolio_value(state["holdings"], open_prices, state["cash"])
trades = compute_trades(state["holdings"], state["cash"], target_weights,
open_prices, min_trade_value=max(50, total * 0.001))
open_prices, min_trade_value=max(50, total * 0.001),
integer_shares=args.integer_shares)
# Store pending
state["pending_trades"] = {
@@ -386,12 +473,13 @@ def cmd_evening(args):
target_weights = pending.get("target_weights", {})
exec_trades = compute_trades(
state["holdings"], state["cash"], target_weights,
close_prices, min_trade_value=max(50, pre_value * 0.001)
close_prices, min_trade_value=max(50, pre_value * 0.001),
integer_shares=args.integer_shares
)
execute_trades(state, exec_trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee,
trade_date=trade_date)
trade_date=trade_date, integer_shares=args.integer_shares)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][trade_date] = round(post_value, 2)
@@ -505,6 +593,92 @@ def cmd_status(args):
print()
def cmd_log(args):
"""Print the daily log: each day's holdings, cash, operations."""
market = args.market
strategy_name = args.strategy
state = load_state(market, strategy_name)
if not state:
print("No state found. Run 'simulate' or 'auto' first.")
return
daily_log = state.get("daily_log", [])
if not daily_log:
print("No daily log entries. Re-run simulate with the latest code to generate logs.")
return
# Optional date filter
start = args.start if hasattr(args, "start") and args.start else None
end = args.end if hasattr(args, "end") and args.end else None
filtered = daily_log
if start:
filtered = [d for d in filtered if d["date"] >= start]
if end:
filtered = [d for d in filtered if d["date"] <= end]
if not filtered:
print(f"No log entries in range {start or '...'} to {end or '...'}.")
return
print(f"\n{'='*80}")
print(f" DAILY LOG — {strategy_name} | {market.upper()}")
print(f" {filtered[0]['date']} to {filtered[-1]['date']} ({len(filtered)} days)")
print(f"{'='*80}")
for entry in filtered:
d = entry["date"]
eq = entry["equity"]
cash = entry["cash"]
ret = entry["daily_return_pct"]
ops = entry.get("operations", [])
holds = entry.get("holdings", {})
print(f"\n┌─ {d} equity: ${eq:>10,.2f} daily: {ret:>+.2f}% "
f"cash: ${cash:>10,.2f} positions: {entry['n_positions']}")
# Holdings
if holds:
print(f"{'Ticker':<8} {'Shares':>8} {'Price':>10} {'Value':>12} {'Weight':>8}")
print(f"{''*50}")
for ticker in sorted(holds, key=lambda t: -holds[t]["value"]):
h = holds[ticker]
w = h["value"] / eq * 100 if eq > 0 else 0
print(f"{ticker:<8} {h['shares']:>8} {h['price']:>10.2f} "
f"${h['value']:>10,.2f} {w:>7.1f}%")
cash_w = cash / eq * 100 if eq > 0 else 0
print(f"{'Cash':<8} {'':>8} {'':>10} ${cash:>10,.2f} {cash_w:>7.1f}%")
# Operations
if ops:
print(f"")
print(f"│ Operations:")
print(f"{'Action':<6} {'Ticker':<8} {'Shares':>8} {'Price':>10} {'Value':>12}")
print(f"{''*46}")
for op in ops:
sign = "+" if op["action"] == "BUY" else "-"
print(f"{op['action']:<6} {op['ticker']:<8} {sign}{op['shares']:>7} "
f"{op['price']:>10.2f} ${op['value']:>10,.2f}")
else:
print(f"│ (no trades)")
print(f"{''*79}")
# Summary
first = filtered[0]
last = filtered[-1]
total_ops = sum(e["n_trades"] for e in filtered)
trade_days = sum(1 for e in filtered if e["n_trades"] > 0)
print(f"\n Period: {first['date']}{last['date']}")
print(f" Start equity: ${first['equity']:>12,.2f}")
print(f" End equity: ${last['equity']:>12,.2f}")
print(f" Return: {(last['equity']/first['equity']-1)*100:>+11.2f}%")
print(f" Total trades: {total_ops:>11}")
print(f" Trade days: {trade_days:>11} / {len(filtered)}")
print()
def cmd_simulate(args):
"""Simulate day-by-day over a date range."""
market = args.market
@@ -579,16 +753,21 @@ def cmd_simulate(args):
# Compute and execute trades at close prices
trades = compute_trades(
state["holdings"], state["cash"], target_weights,
close_prices, min_trade_value=max(50, pre_value * 0.001)
close_prices, min_trade_value=max(50, pre_value * 0.001),
integer_shares=args.integer_shares
)
execute_trades(state, trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee,
trade_date=day_str)
trade_date=day_str, integer_shares=args.integer_shares)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][day_str] = round(post_value, 2)
# Record daily snapshot (holdings + operations)
prev_eq_val = args.capital if i == 0 else list(state["daily_equity"].values())[-2] if len(state["daily_equity"]) >= 2 else args.capital
record_daily_snapshot(state, day_str, close_prices, trades, prev_eq_val)
day_trades = len(trades)
day_commission = sum(
t.get("commission", 0) for t in state["trade_log"][-day_trades:]
@@ -661,8 +840,11 @@ def cmd_monitor(args):
"""
Long-running monitor — runs in a tmux session, automatically executes daily.
Sleeps until market close + buffer, runs auto logic, then sleeps until
the next trading day. Handles weekends and holidays gracefully.
Two-phase daily schedule:
1. MORNING (~9:45 AM): Download open prices, run strategy, print orders
2. EVENING (~4:35 PM): Download close prices, execute trades, record
Sleeps between phases and between trading days. Skips weekends.
Usage:
tmux new -s quant
@@ -674,20 +856,18 @@ def cmd_monitor(args):
market = args.market
# Market schedule configuration
# Market schedule configuration — two phases per day
MARKET_CONFIG = {
"us": {
"tz": zoneinfo.ZoneInfo("America/New_York"),
"close_hour": 16, # 4:00 PM ET
"close_min": 0,
"buffer_min": 35, # Wait 35 min after close for data settlement
"open_hour": 9, "open_min": 30, "open_buffer": 15, # run at 9:45 AM ET
"close_hour": 16, "close_min": 0, "close_buffer": 35, # run at 4:35 PM ET
"label": "US (NYSE/NASDAQ)",
},
"cn": {
"tz": zoneinfo.ZoneInfo("Asia/Shanghai"),
"close_hour": 15, # 3:00 PM CST
"close_min": 0,
"buffer_min": 35,
"open_hour": 9, "open_min": 30, "open_buffer": 15, # run at 9:45 AM CST
"close_hour": 15, "close_min": 0, "close_buffer": 35, # run at 3:35 PM CST
"label": "CN (SSE/SZSE)",
},
}
@@ -698,80 +878,117 @@ def cmd_monitor(args):
return
tz = config["tz"]
run_hour = config["close_hour"]
run_min = config["close_min"] + config["buffer_min"]
# Handle minute overflow
if run_min >= 60:
run_hour += run_min // 60
run_min = run_min % 60
# Compute actual run times (hour, minute)
morn_h = config["open_hour"]
morn_m = config["open_min"] + config["open_buffer"]
if morn_m >= 60:
morn_h += morn_m // 60
morn_m = morn_m % 60
eve_h = config["close_hour"]
eve_m = config["close_min"] + config["close_buffer"]
if eve_m >= 60:
eve_h += eve_m // 60
eve_m = eve_m % 60
print(f"\n{'='*60}")
print(f" MONITOR MODE — {config['label']}")
print(f" Strategy: {args.strategy} | Market: {market.upper()}")
print(f" Daily run at: {run_hour:02d}:{run_min:02d} {tz}")
print(f" Morning (open → signals): {morn_h:02d}:{morn_m:02d} {tz}")
print(f" Evening (close → execute): {eve_h:02d}:{eve_m:02d} {tz}")
print(f" Capital: ${args.capital:,.0f}")
print(f"{'='*60}")
print(f"[monitor] Started at {datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] Press Ctrl+C to stop\n")
def _next_run_time(now):
"""Compute the next execution datetime (skip weekends)."""
# Today's run time
target = now.replace(hour=run_hour, minute=run_min, second=0, microsecond=0)
def _next_event(now):
"""Return (target_datetime, phase) for the next event to run.
if now >= target:
# Already past today's run time, schedule for tomorrow
target += timedelta(days=1)
phase is 'morning' or 'evening'.
"""
today_morning = now.replace(hour=morn_h, minute=morn_m, second=0, microsecond=0)
today_evening = now.replace(hour=eve_h, minute=eve_m, second=0, microsecond=0)
# Skip weekends: Saturday=5, Sunday=6
candidates = []
# Today's morning (if not past)
if now < today_morning:
candidates.append((today_morning, "morning"))
# Today's evening (if not past)
if now < today_evening:
candidates.append((today_evening, "evening"))
# If nothing left today, schedule tomorrow's morning
if not candidates:
tomorrow = now + timedelta(days=1)
tom_morning = tomorrow.replace(hour=morn_h, minute=morn_m, second=0, microsecond=0)
candidates.append((tom_morning, "morning"))
# Pick the earliest
target, phase = min(candidates, key=lambda x: x[0])
# Skip weekends
while target.weekday() >= 5:
target += timedelta(days=1)
return target
return target, phase
while True:
now = datetime.now(tz)
next_run = _next_run_time(now)
wait_seconds = (next_run - now).total_seconds()
print(f"[monitor] {now.strftime('%Y-%m-%d %H:%M:%S')}"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S %Z')} "
f"(in {wait_seconds/3600:.1f}h)")
# Sleep in chunks so Ctrl+C is responsive and we print heartbeats
def _sleep_until(target):
"""Sleep in chunks until target time. Print hourly heartbeats."""
while True:
now = datetime.now(tz)
remaining = (next_run - now).total_seconds()
remaining = (target - now).total_seconds()
if remaining <= 0:
break
# Sleep in 15-minute chunks, print heartbeat every hour
chunk = min(remaining, 900) # 15 min
chunk = min(remaining, 900) # 15-min chunks
_time.sleep(chunk)
now = datetime.now(tz)
remaining = (next_run - now).total_seconds()
remaining = (target - now).total_seconds()
if remaining > 60:
hours_left = remaining / 3600
if int(remaining) % 3600 < 900: # ~on the hour
if int(remaining) % 3600 < 900:
print(f"[monitor] {now.strftime('%H:%M:%S')}"
f"waiting... ({hours_left:.1f}h until run)")
f"waiting... ({hours_left:.1f}h until next)")
while True:
now = datetime.now(tz)
target, phase = _next_event(now)
wait_seconds = (target - now).total_seconds()
print(f"[monitor] {now.strftime('%Y-%m-%d %H:%M:%S')}"
f"Next: {phase.upper()} at {target.strftime('%Y-%m-%d %H:%M:%S %Z')} "
f"(in {wait_seconds/3600:.1f}h)")
_sleep_until(target)
# Time to run!
now = datetime.now(tz)
print(f"\n[monitor] {'='*55}")
print(f"[monitor] Executing daily run at {now.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] {phase.upper()} phase at "
f"{now.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] {'='*55}")
try:
cmd_auto(args)
print(f"[monitor] Daily run completed successfully.")
if phase == "morning":
cmd_morning(args)
else:
# Check if morning was run today; if not, use auto (combined)
state = load_state(market, args.strategy)
today_str = now.strftime("%Y-%m-%d")
if state and state.get("last_morning") == today_str and state.get("pending_trades"):
cmd_evening(args)
else:
print(f"[monitor] Morning was not run today — using auto (combined) mode")
cmd_auto(args)
print(f"[monitor] {phase.upper()} completed successfully.")
except KeyboardInterrupt:
raise
except Exception as e:
print(f"[monitor] ERROR during daily run: {e}")
print(f"[monitor] ERROR during {phase}: {e}")
import traceback
traceback.print_exc()
print(f"[monitor] Will retry on next scheduled run.")
print(f"[monitor] Will continue to next scheduled event.")
print()
@@ -837,15 +1054,22 @@ def cmd_auto(args):
# --- EVENING PHASE: Execute trades at close prices ---
trades = compute_trades(
state["holdings"], state["cash"], target_weights,
close_prices, min_trade_value=max(50, pre_value * 0.001)
close_prices, min_trade_value=max(50, pre_value * 0.001),
integer_shares=args.integer_shares
)
execute_trades(state, trades, close_prices,
tx_cost=args.tx_cost, fixed_fee=args.fixed_fee,
trade_date=today_str)
trade_date=today_str, integer_shares=args.integer_shares)
post_value = portfolio_value(state["holdings"], close_prices, state["cash"])
state["daily_equity"][today_str] = round(post_value, 2)
# Record daily snapshot
eq_vals = list(state["daily_equity"].values())
prev_eq = eq_vals[-2] if len(eq_vals) >= 2 else state["initial_capital"]
record_daily_snapshot(state, today_str, close_prices, trades, prev_eq)
state["last_morning"] = today_str
state["last_evening"] = today_str
state["pending_trades"] = None
@@ -897,6 +1121,8 @@ def main():
help="Proportional transaction cost (default: 0.001 = 10bps)")
p.add_argument("--fixed-fee", type=float, default=0.0,
help="Fixed dollar fee per trade")
p.add_argument("--integer-shares", action="store_true", default=False,
help="Only trade whole shares (no fractional)")
# Morning
p_morning = sub.add_parser("morning", help="Generate trade orders from open prices")
@@ -924,6 +1150,7 @@ def main():
p_status.add_argument("--capital", type=float, default=100_000)
p_status.add_argument("--tx-cost", type=float, default=0.001)
p_status.add_argument("--fixed-fee", type=float, default=0.0)
p_status.add_argument("--integer-shares", action="store_true", default=False)
# Simulate
p_sim = sub.add_parser("simulate", help="Simulate over a date range")
@@ -931,6 +1158,14 @@ def main():
p_sim.add_argument("--start", required=True, help="Start date (YYYY-MM-DD)")
p_sim.add_argument("--end", required=True, help="End date (YYYY-MM-DD)")
# Log viewer
p_log = sub.add_parser("log", help="View daily log (holdings + operations per day)")
p_log.add_argument("--market", choices=UNIVERSES.keys(), default="us")
p_log.add_argument("--strategy", default="sim_recovery_mom_top10",
help="Strategy name (e.g. sim_recovery_mom_top10)")
p_log.add_argument("--start", default=None, help="Start date filter (YYYY-MM-DD)")
p_log.add_argument("--end", default=None, help="End date filter (YYYY-MM-DD)")
args = parser.parse_args()
if args.command == "morning":
@@ -945,6 +1180,8 @@ def main():
cmd_status(args)
elif args.command == "simulate":
cmd_simulate(args)
elif args.command == "log":
cmd_log(args)
if __name__ == "__main__":