Single-process monitor for US + CN markets

- `trader.py monitor` now handles all markets in one process
  (US 9:45/16:35 ET, CN 9:45/15:35 CST) with unified UTC event loop
- Events from different markets/timezones are merged; sleeps until
  the globally next event across all markets
- Overlapping events (e.g. US evening + CN morning) fire together
- `trader.py compare` defaults to US, use --market cn for A-shares
- One tmux session handles everything: just `uv run python trader.py monitor`

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-07 00:16:46 +08:00
parent 4aa17c7e98
commit 14ec64c1da
2 changed files with 154 additions and 112 deletions

View File

@@ -23,7 +23,8 @@ uv run python main.py --execution open-close # Signal on open, execute at close
uv run python trader.py auto --market us --strategy recovery_mom_top10 # Single daily run (for cron) uv run python trader.py auto --market us --strategy recovery_mom_top10 # Single daily run (for cron)
uv run python trader.py morning --market us --strategy recovery_mom_top10 # Morning: generate orders uv run python trader.py morning --market us --strategy recovery_mom_top10 # Morning: generate orders
uv run python trader.py evening --market us --strategy recovery_mom_top10 # Evening: record execution uv run python trader.py evening --market us --strategy recovery_mom_top10 # Evening: record execution
uv run python trader.py monitor --market us --strategy recovery_mom_top10 # Long-running daemon (for tmux) uv run python trader.py monitor # Monitor ALL markets & strategies (for tmux)
uv run python trader.py monitor --market us # Monitor US only
uv run python trader.py status --market us --strategy recovery_mom_top10 # Portfolio status uv run python trader.py status --market us --strategy recovery_mom_top10 # Portfolio status
uv run python trader.py simulate --market us --strategy recovery_mom_top10 --start 2026-01-01 --end 2026-04-01 # Historical replay uv run python trader.py simulate --market us --strategy recovery_mom_top10 --start 2026-01-01 --end 2026-04-01 # Historical replay
uv run python trader.py log --market us --strategy sim_recovery_mom_top10 --start 2026-03-01 # View daily log uv run python trader.py log --market us --strategy sim_recovery_mom_top10 --start 2026-03-01 # View daily log
@@ -73,12 +74,16 @@ No test suite or linter is configured.
### Option 1: tmux monitor (recommended) ### Option 1: tmux monitor (recommended)
Two-phase daily schedule — morning (open prices → generate signals) and evening (close prices → execute trades): Single process monitors both US and CN markets. Two phases per market daily — morning (open → signals) and evening (close → execute). All 9 strategies run for each market.
```bash ```bash
tmux new -s quant tmux new -s quant
uv run python trader.py monitor --market us --strategy recovery_mom_top10 uv run python trader.py monitor # Both US + CN, all strategies
# Ctrl-B D to detach; tmux attach -t quant to reconnect # Ctrl-B D to detach; tmux attach -t quant to reconnect
# After a month, compare:
uv run python trader.py compare # US strategies
uv run python trader.py compare --market cn # CN strategies
``` ```
If monitor starts mid-day (after open, before close), it automatically falls back to `auto` mode for the evening phase. If monitor starts mid-day (after open, before close), it automatically falls back to `auto` mode for the evening phase.

255
trader.py
View File

@@ -5,10 +5,11 @@ Subcommands:
morning — Download today's open prices, run strategy, output trade orders morning — Download today's open prices, run strategy, output trade orders
evening — Download close prices, record execution, update portfolio evening — Download close prices, record execution, update portfolio
auto — Single daily run (morning + evening in one step, for cron) auto — Single daily run (morning + evening in one step, for cron)
monitor — Long-running daemon that auto-runs daily after market close (for tmux) monitor — Long-running daemon for US+CN, all strategies (one tmux)
status — Print current portfolio, P&L, and recent trades status — Print current portfolio, P&L, and recent trades
simulate — Replay a date range day-by-day (for forward testing) simulate — Replay a date range day-by-day (for forward testing)
log — View daily log: each day's holdings, cash, and operations log — View daily log: each day's holdings, cash, and operations
compare — Compare all strategies side-by-side (auto-discovers state files)
Usage: Usage:
uv run python trader.py morning --market us --strategy recovery_mom_top10 uv run python trader.py morning --market us --strategy recovery_mom_top10
@@ -987,24 +988,20 @@ def cmd_monitor(args):
""" """
Long-running monitor — runs in a tmux session, automatically executes daily. Long-running monitor — runs in a tmux session, automatically executes daily.
Two-phase daily schedule: Manages ALL markets (US + CN) in a single process. Each market has its
1. MORNING (~9:45 AM): Download open prices, run strategy, print orders own timezone and two daily phases (morning + evening). All strategies
2. EVENING (~4:35 PM): Download close prices, execute trades, record run for every market.
Supports multiple strategies — each gets its own state file and runs
independently at each phase.
Usage: Usage:
tmux new -s quant tmux new -s quant
uv run python trader.py monitor --market us \\ uv run python trader.py monitor
--strategy recovery_mom_top10 momentum dual_momentum
# Ctrl-B D to detach # Ctrl-B D to detach
""" """
import copy import copy
import time as _time import time as _time
import zoneinfo import zoneinfo
market = args.market markets = args.market # list of markets
strategies = args.strategy # list of strategy names strategies = args.strategy # list of strategy names
# Market schedule configuration — two phases per day # Market schedule configuration — two phases per day
@@ -1023,133 +1020,170 @@ def cmd_monitor(args):
}, },
} }
config = MARKET_CONFIG.get(market) # Compute run times for each market
if not config: market_schedules = {}
print(f"[monitor] Unknown market '{market}'. Supported: {list(MARKET_CONFIG.keys())}") for mkt in markets:
return cfg = MARKET_CONFIG[mkt]
tz = cfg["tz"]
tz = config["tz"] morn_h = cfg["open_hour"]
morn_m = cfg["open_min"] + cfg["open_buffer"]
# Compute actual run times (hour, minute) if morn_m >= 60:
morn_h = config["open_hour"] morn_h += morn_m // 60
morn_m = config["open_min"] + config["open_buffer"] morn_m = morn_m % 60
if morn_m >= 60: eve_h = cfg["close_hour"]
morn_h += morn_m // 60 eve_m = cfg["close_min"] + cfg["close_buffer"]
morn_m = morn_m % 60 if eve_m >= 60:
eve_h += eve_m // 60
eve_h = config["close_hour"] eve_m = eve_m % 60
eve_m = config["close_min"] + config["close_buffer"] market_schedules[mkt] = {
if eve_m >= 60: "tz": tz, "label": cfg["label"],
eve_h += eve_m // 60 "morn_h": morn_h, "morn_m": morn_m,
eve_m = eve_m % 60 "eve_h": eve_h, "eve_m": eve_m,
}
# Banner
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f" MONITOR MODE — {config['label']}") print(f" MONITOR MODE — {len(markets)} market(s), "
print(f" Market: {market.upper()} | Capital: ${args.capital:,.0f}") f"{len(strategies)} strategies each")
print(f" Strategies ({len(strategies)}):") print(f" Capital: ${args.capital:,.0f} | "
for s in strategies: f"Fee: ${args.fixed_fee:.2f}/trade | "
print(f"{s}") f"Integer shares: {args.integer_shares}")
print(f" Morning (open → signals): {morn_h:02d}:{morn_m:02d} {tz}") for mkt, sched in market_schedules.items():
print(f" Evening (close → execute): {eve_h:02d}:{eve_m:02d} {tz}") print(f" {sched['label']}:")
print(f" Morning: {sched['morn_h']:02d}:{sched['morn_m']:02d} {sched['tz']}")
print(f" Evening: {sched['eve_h']:02d}:{sched['eve_m']:02d} {sched['tz']}")
print(f" Strategies: {', '.join(strategies)}")
print(f"{'='*60}") print(f"{'='*60}")
print(f"[monitor] Started at {datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Use UTC as common reference for sleeping
utc = zoneinfo.ZoneInfo("UTC")
print(f"[monitor] Started at {datetime.now(utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"[monitor] Press Ctrl+C to stop\n") print(f"[monitor] Press Ctrl+C to stop\n")
def _next_event(now): def _next_events_for_market(mkt, now_utc):
"""Return (target_datetime, phase) for the next event to run. """Return list of (utc_datetime, market, phase) for next events."""
sched = market_schedules[mkt]
phase is 'morning' or 'evening'. tz = sched["tz"]
""" now_local = now_utc.astimezone(tz)
today_morning = now.replace(hour=morn_h, minute=morn_m, second=0, microsecond=0)
today_evening = now.replace(hour=eve_h, minute=eve_m, second=0, microsecond=0)
candidates = [] candidates = []
# Today's morning (if not past) for phase, h, m in [("morning", sched["morn_h"], sched["morn_m"]),
if now < today_morning: ("evening", sched["eve_h"], sched["eve_m"])]:
candidates.append((today_morning, "morning")) target = now_local.replace(hour=h, minute=m, second=0, microsecond=0)
# Today's evening (if not past) if now_local >= target:
if now < today_evening: target += timedelta(days=1)
candidates.append((today_evening, "evening")) # Skip weekends
while target.weekday() >= 5:
target += timedelta(days=1)
candidates.append((target.astimezone(utc), mkt, phase))
return candidates
# If nothing left today, schedule tomorrow's morning def _next_event(now_utc):
if not candidates: """Find the globally next event across all markets."""
tomorrow = now + timedelta(days=1) all_candidates = []
tom_morning = tomorrow.replace(hour=morn_h, minute=morn_m, second=0, microsecond=0) for mkt in markets:
candidates.append((tom_morning, "morning")) all_candidates.extend(_next_events_for_market(mkt, now_utc))
return min(all_candidates, key=lambda x: x[0])
# Pick the earliest def _run_phase(market, phase, now_utc):
target, phase = min(candidates, key=lambda x: x[0]) """Run all strategies for a market/phase."""
sched = market_schedules[market]
tz = sched["tz"]
now_local = now_utc.astimezone(tz)
# Skip weekends
while target.weekday() >= 5:
target += timedelta(days=1)
return target, phase
def _sleep_until(target):
"""Sleep in chunks until target time. Print hourly heartbeats."""
while True:
now = datetime.now(tz)
remaining = (target - now).total_seconds()
if remaining <= 0:
break
chunk = min(remaining, 900) # 15-min chunks
_time.sleep(chunk)
now = datetime.now(tz)
remaining = (target - now).total_seconds()
if remaining > 60:
hours_left = remaining / 3600
if int(remaining) % 3600 < 900:
print(f"[monitor] {now.strftime('%H:%M:%S')}"
f"waiting... ({hours_left:.1f}h until next)")
while True:
now = datetime.now(tz)
target, phase = _next_event(now)
wait_seconds = (target - now).total_seconds()
print(f"[monitor] {now.strftime('%Y-%m-%d %H:%M:%S')}"
f"Next: {phase.upper()} at {target.strftime('%Y-%m-%d %H:%M:%S %Z')} "
f"(in {wait_seconds/3600:.1f}h)")
_sleep_until(target)
# Time to run!
now = datetime.now(tz)
print(f"\n[monitor] {'='*55}") print(f"\n[monitor] {'='*55}")
print(f"[monitor] {phase.upper()} phase at " print(f"[monitor] {market.upper()} {phase.upper()} at "
f"{now.strftime('%Y-%m-%d %H:%M:%S %Z')}") f"{now_local.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print(f"[monitor] {'='*55}") print(f"[monitor] {'='*55}")
for strat_name in strategies: for strat_name in strategies:
# Create a single-strategy args copy for each sub-command
sub_args = copy.copy(args) sub_args = copy.copy(args)
sub_args.strategy = strat_name sub_args.strategy = strat_name
sub_args.market = market
print(f"\n[monitor] --- {strat_name} ---") print(f"\n[monitor] --- {market.upper()}:{strat_name} ---")
try: try:
if phase == "morning": if phase == "morning":
cmd_morning(sub_args) cmd_morning(sub_args)
else: else:
# Check if morning was run today; if not, use auto (combined)
state = load_state(market, strat_name) state = load_state(market, strat_name)
today_str = now.strftime("%Y-%m-%d") today_str = now_local.strftime("%Y-%m-%d")
if state and state.get("last_morning") == today_str and state.get("pending_trades"): if (state and state.get("last_morning") == today_str
and state.get("pending_trades")):
cmd_evening(sub_args) cmd_evening(sub_args)
else: else:
print(f"[monitor] Morning not run for {strat_name} — using auto mode") print(f"[monitor] Morning not run for "
f"{market.upper()}:{strat_name} — using auto")
cmd_auto(sub_args) cmd_auto(sub_args)
print(f"[monitor] {strat_name} {phase} OK") print(f"[monitor] {market.upper()}:{strat_name} {phase} OK")
except KeyboardInterrupt: except KeyboardInterrupt:
raise raise
except Exception as e: except Exception as e:
print(f"[monitor] ERROR in {strat_name} {phase}: {e}") print(f"[monitor] ERROR {market.upper()}:{strat_name} "
f"{phase}: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
print(f"[monitor] Continuing with next strategy...")
print(f"\n[monitor] All {len(strategies)} strategies processed.") print(f"[monitor] {market.upper()} {phase} done — "
f"{len(strategies)} strategies")
while True:
now_utc = datetime.now(utc)
target_utc, next_mkt, next_phase = _next_event(now_utc)
wait_seconds = (target_utc - now_utc).total_seconds()
next_tz = market_schedules[next_mkt]["tz"]
target_local = target_utc.astimezone(next_tz)
print(f"[monitor] {now_utc.strftime('%Y-%m-%d %H:%M UTC')}"
f"Next: {next_mkt.upper()} {next_phase.upper()} at "
f"{target_local.strftime('%H:%M %Z %m/%d')} "
f"(in {wait_seconds/3600:.1f}h)")
# Sleep in chunks
while True:
now_utc = datetime.now(utc)
remaining = (target_utc - now_utc).total_seconds()
if remaining <= 0:
break
chunk = min(remaining, 900)
_time.sleep(chunk)
now_utc = datetime.now(utc)
remaining = (target_utc - now_utc).total_seconds()
if remaining > 60:
hours_left = remaining / 3600
if int(remaining) % 3600 < 900:
print(f"[monitor] {now_utc.strftime('%H:%M UTC')}"
f"waiting... ({hours_left:.1f}h until "
f"{next_mkt.upper()} {next_phase})")
# Execute
now_utc = datetime.now(utc)
# Run all events that are due NOW (within 2 min window) — handles
# case where US evening and CN morning overlap
for mkt in markets:
for phase_check in ("morning", "evening"):
sched = market_schedules[mkt]
tz = sched["tz"]
now_local = now_utc.astimezone(tz)
if phase_check == "morning":
h, m = sched["morn_h"], sched["morn_m"]
else:
h, m = sched["eve_h"], sched["eve_m"]
target_local = now_local.replace(
hour=h, minute=m, second=0, microsecond=0)
diff = abs((now_local - target_local).total_seconds())
# Fire if within 2-minute window and it's a weekday
if diff < 120 and now_local.weekday() < 5:
try:
_run_phase(mkt, phase_check, now_utc)
except KeyboardInterrupt:
raise
except Exception as e:
print(f"[monitor] ERROR in {mkt} {phase_check}: {e}")
import traceback
traceback.print_exc()
print() print()
@@ -1298,10 +1332,13 @@ def main():
help="Automated daily run: signal + execute in one step (for cron)") help="Automated daily run: signal + execute in one step (for cron)")
add_common(p_auto) add_common(p_auto)
# Monitor (long-running daemon for tmux) — supports multiple strategies # Monitor (long-running daemon for tmux) — all markets + all strategies
p_monitor = sub.add_parser("monitor", p_monitor = sub.add_parser("monitor",
help="Long-running daemon: runs ALL strategies daily (for tmux)") help="Long-running daemon: runs ALL markets & strategies (for tmux)")
p_monitor.add_argument("--market", choices=UNIVERSES.keys(), default="us") p_monitor.add_argument("--market", nargs="+",
choices=list(UNIVERSES.keys()),
default=list(UNIVERSES.keys()),
help="Markets to monitor (default: ALL)")
p_monitor.add_argument("--strategy", nargs="+", p_monitor.add_argument("--strategy", nargs="+",
choices=list(STRATEGY_REGISTRY.keys()), choices=list(STRATEGY_REGISTRY.keys()),
default=list(STRATEGY_REGISTRY.keys()), default=list(STRATEGY_REGISTRY.keys()),