Add interaction screening matrix generator

This commit is contained in:
2026-07-01 14:28:34 +08:00
parent 46b477f48e
commit d8899c50ce
3 changed files with 662 additions and 0 deletions

View File

@@ -0,0 +1,221 @@
from __future__ import annotations
import contextlib
import io
import json
import tempfile
import unittest
from pathlib import Path
from aituner.cli import main as cli_main
from aituner.interaction_matrix import build_interaction_screening_matrix
from aituner.spec import load_study_spec
from aituner.trace import load_trace_requests, summarize_window
def _write_interaction_study(tmp_path: Path) -> Path:
trace_dir = tmp_path / "trace_windows" / "traces"
trace_dir.mkdir(parents=True)
trace_path = trace_dir / "chat_w1.jsonl"
rows = []
for idx in range(24):
rows.append(
{
"request_id": f"r{idx}",
"timestamp": float(idx // 4),
"sampling_u": 0.1 + idx * 0.01,
"messages": [{"role": "user", "content": "hello"}],
"input_length": 7000 if idx < 20 else 9000,
"output_length": 64,
}
)
with trace_path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
windows_path = tmp_path / "trace_windows" / "windows.json"
windows_path.write_text(
json.dumps(
{
"u_field": "sampling_u",
"windows": [
{
"window_id": "chat_w1",
"trace_type": "chat",
"trace_file": "traces/chat_w1.jsonl",
"window_start": 0.0,
"window_end": 6.0,
}
],
}
),
encoding="utf-8",
)
study_path = tmp_path / "study.json"
study_path.write_text(
json.dumps(
{
"study_id": "interaction-study",
"hardware": {
"gpu_count": 8,
"gpu_model": "H20",
"host_candidates": ["dash1"],
},
"model": {
"model_id": "qwen",
"served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507",
},
"engine": {
"engine_name": "vllm",
"engine_version": "0.20",
"exec_path": "/usr/local/bin/vllm",
"cwd": str(tmp_path),
"host": "127.0.0.1",
"port": 8000,
"healthcheck_path": "/v1/models",
"ready_timeout_s": 30,
"request_timeout_s": 30,
"launch_args": ["serve", "/models/qwen"],
"base_envs": {},
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 2,
"gpu-memory-utilization": 0.7,
},
"tunable_envs": [],
"tunable_flags": [
"tensor-parallel-size",
"gpu-memory-utilization",
"max-num-seqs",
"max-num-batched-tokens",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_tp_dp_products": [1, 2, 4, 8],
},
"python_executable": "python3",
},
"trace": {
"windows_path": str(windows_path),
"window_id": "chat_w1",
"u_field": "sampling_u",
"timestamp_field": "timestamp",
"max_concurrency": 64,
},
"slo": {
"target_pass_rate": 0.95,
"ttft_rule": {"kind": "fixed_ms", "threshold_ms": 5000},
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120},
},
"search": {
"low": 0.0,
"high": 1.0,
"tolerance": 0.01,
"max_probes": 8,
"sample_seed": 20260325,
},
"llm": {"system_prompt": "Tune it.", "max_history_trials": 8},
}
),
encoding="utf-8",
)
return study_path
class InteractionMatrixTests(unittest.TestCase):
def test_screening_matrix_uses_normalized_axis_scales(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_interaction_study(Path(tmp))
study = load_study_spec(study_path)
window, requests = load_trace_requests(study, study_spec_path=study_path)
matrix = build_interaction_screening_matrix(
study=study,
window_summary=summarize_window(requests, window),
)
self.assertEqual(len(matrix["configs"]), 8)
self.assertEqual(
matrix["axes"]["tp"]["levels"],
{
"low": {"tensor-parallel-size": 2},
"high": {"tensor-parallel-size": 4},
},
)
self.assertEqual(matrix["axes"]["mns"]["levels"], {"low": 8, "high": 16})
self.assertEqual(matrix["axes"]["mbt"]["levels"], {"low": 9216, "high": 32768})
self.assertEqual(
matrix["fixed_tunable_base_flags"],
{"gpu-memory-utilization": 0.7},
)
patches = [
item["config_patch"]["flag_patch"]
for item in matrix["configs"]
]
self.assertIn(
{
"tensor-parallel-size": 4,
"max-num-seqs": 16,
"max-num-batched-tokens": 32768,
},
patches,
)
def test_screening_matrix_can_use_tp_dp_redistribution(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_interaction_study(Path(tmp))
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["engine"]["base_flags"]["data-parallel-size"] = 2
payload["engine"]["tunable_flags"].append("data-parallel-size")
payload["engine"]["topology_constraints"] = {
"require_tp_dp_product_equals_gpu_count": True,
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
}
study_path.write_text(json.dumps(payload), encoding="utf-8")
study = load_study_spec(study_path)
window, requests = load_trace_requests(study, study_spec_path=study_path)
matrix = build_interaction_screening_matrix(
study=study,
window_summary=summarize_window(requests, window),
)
self.assertEqual(matrix["axes"]["tp"]["knob"], "topology")
self.assertEqual(
matrix["axes"]["tp"]["levels"],
{
"low": {"tensor-parallel-size": 2, "data-parallel-size": 4},
"high": {"tensor-parallel-size": 4, "data-parallel-size": 2},
},
)
self.assertNotIn("data-parallel-size", matrix["fixed_tunable_base_flags"])
self.assertIn(
{
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"max-num-seqs": 16,
"max-num-batched-tokens": 32768,
},
[item["config_patch"]["flag_patch"] for item in matrix["configs"]],
)
def test_cli_profile_interaction_matrix_prints_json(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_interaction_study(Path(tmp))
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
rc = cli_main(["profile", "interaction-matrix", "--spec", str(study_path)])
self.assertEqual(rc, 0)
payload = json.loads(stdout.getvalue())
self.assertEqual(payload["matrix_kind"], "low_high_factorial_screening")
self.assertEqual(payload["source"]["window_id"], "chat_w1")
self.assertEqual(len(payload["configs"]), 8)
if __name__ == "__main__":
unittest.main()