Add interaction screening matrix generator

This commit is contained in:
2026-07-01 14:28:34 +08:00
parent 46b477f48e
commit 8a3c0d5f4c
3 changed files with 507 additions and 0 deletions

View File

@@ -18,6 +18,7 @@ from .harness import (
build_harness_guided_proposal,
build_harness_stop_proposal,
)
from .interaction_matrix import build_interaction_screening_matrix
from .job import append_job, build_trial_job
from .lca import (
build_study_workload_profile,
@@ -1169,6 +1170,18 @@ def cmd_profile_similarity(args: argparse.Namespace) -> int:
return 0
def cmd_profile_interaction_matrix(args: argparse.Namespace) -> int:
spec_path = Path(args.spec).resolve()
study = _load_profile_study_spec(spec_path)
window, requests = load_trace_requests(study, study_spec_path=spec_path)
matrix = build_interaction_screening_matrix(
study=study,
window_summary=summarize_window(requests, window),
)
print(json.dumps(matrix, ensure_ascii=False, indent=2))
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="AITuner CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
@@ -1290,6 +1303,10 @@ def build_parser() -> argparse.ArgumentParser:
)
profile_similarity.set_defaults(func=cmd_profile_similarity)
profile_interaction = profile_sub.add_parser("interaction-matrix")
profile_interaction.add_argument("--spec", required=True)
profile_interaction.set_defaults(func=cmd_profile_interaction_matrix)
return parser

View File

@@ -0,0 +1,314 @@
from __future__ import annotations
import itertools
import math
from typing import Any
from .spec import SpecError, StudySpec
AXIS_KNOBS = (
"tensor-parallel-size",
"max-num-seqs",
"max-num-batched-tokens",
)
def build_interaction_screening_matrix(
*,
study: StudySpec,
window_summary: dict[str, Any],
) -> dict[str, Any]:
"""Build the low/high screening matrix for knob-interaction experiments.
This does not launch any run. It only materializes a reviewable set of fixed
config patches for a two-level factorial screen over TP, MNS, and MBT.
"""
_require_axis_knobs(study)
tp_axis = _tp_axis(study)
mns_axis = _mns_axis(study, window_summary)
mbt_axis = _mbt_axis(window_summary)
axes = {
"tp": tp_axis,
"mns": mns_axis,
"mbt": mbt_axis,
}
configs = _corner_configs(axes)
return {
"schema_version": 1,
"matrix_kind": "low_high_factorial_screening",
"study_id": study.study_id,
"objective": (
"Estimate whether TP, max-num-seqs, and max-num-batched-tokens "
"have non-separable effects on feasible request_rate_per_gpu."
),
"source": {
"window_id": study.trace.window_id,
"request_mode": study.trace.request_mode,
"engine_name": study.engine.engine_name,
"engine_version": study.engine.engine_version,
"hardware_gpu_count": study.hardware.gpu_count,
},
"window_summary": {
key: window_summary.get(key)
for key in (
"request_count",
"duration_s",
"request_rate",
"prompt_tokens_p50",
"prompt_tokens_p95",
"prompt_tokens_p99",
"arrival_qps_1s_p50",
"arrival_qps_1s_p95",
"arrival_burst_ratio_p95_to_mean",
)
},
"axes": axes,
"fixed_tunable_base_flags": _fixed_tunable_base_flags(study),
"configs": configs,
"recommended_repeats": [
{
"config_id": configs[0]["config_id"],
"reason": "Estimate low-capacity corner noise and baseline drift.",
},
{
"config_id": configs[-1]["config_id"],
"reason": "Estimate high-capacity joint-corner noise.",
},
],
"analysis_plan": {
"primary_metric": "max feasible request_rate_per_gpu at target pass_rate",
"plot_1": (
"2x2 MNS/MBT corner heatmaps faceted by TP low/high, colored by "
"request_rate_per_gpu and hatched when infeasible."
),
"plot_2": (
"Conditional-effect lines: MNS low/high on x-axis, one line per "
"TP, faceted by MBT low/high."
),
"plot_3": (
"Interaction-contrast bars for TPxMNS, TPxMBT, MNSxMBT, and "
"TPxMNSxMBT normalized by the low/low/low corner."
),
},
"review_notes": [
"This matrix is for hypothesis testing only; it does not use LLM or harness tuning.",
"All non-axis tunable base flags remain fixed and are listed in fixed_tunable_base_flags.",
"If a high level is launch-unsafe in a real run, lower it to the largest launch-safe value and keep the blocked corner in the report.",
],
}
def _require_axis_knobs(study: StudySpec) -> None:
tunable = set(study.engine.tunable_flags)
missing = [knob for knob in AXIS_KNOBS if knob not in tunable]
if missing:
raise SpecError(
"interaction screening requires tunable flags: "
+ ", ".join(AXIS_KNOBS)
+ f". Missing: {', '.join(missing)}."
)
def _tp_axis(study: StudySpec) -> dict[str, Any]:
base_flags = study.engine.base_flags
base_tp = _int_flag(base_flags.get("tensor-parallel-size"), default=1)
base_dp = _int_flag(base_flags.get("data-parallel-size"), default=1)
constraints = study.engine.topology_constraints
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
raw_values = constraints.allowed_tensor_parallel_sizes
else:
raw_values = [1, 2, 4, 8]
legal = [
int(value)
for value in sorted(set(raw_values))
if _legal_tp_with_fixed_dp(study, tp=int(value), dp=base_dp)
]
if len(legal) < 2:
raise SpecError(
"interaction screening needs at least two legal TP levels with fixed "
f"data-parallel-size={base_dp}. Legal levels: {legal}."
)
low, high = _adjacent_tp_pair(legal, base_tp)
return {
"knob": "tensor-parallel-size",
"levels": {"low": low, "high": high},
"all_legal_values_with_fixed_dp": legal,
"fixed_data_parallel_size": base_dp,
"scale_rule": (
"Choose an adjacent legal TP pair around the base TP when possible; "
"otherwise use the nearest legal pair. DP is fixed to the base value."
),
"base_value": base_tp,
}
def _mns_axis(study: StudySpec, window_summary: dict[str, Any]) -> dict[str, Any]:
arrival_p95 = _float_value(window_summary.get("arrival_qps_1s_p95"))
request_rate = _float_value(window_summary.get("request_rate"))
concurrency_scale = max(arrival_p95, request_rate, 1.0)
low = _round_up_to_multiple(int(math.ceil(0.75 * concurrency_scale)), 8)
high = _round_up_to_multiple(int(math.ceil(3.0 * concurrency_scale)), 8)
high = min(max(high, low * 2), study.trace.max_concurrency)
if high <= low:
high = low * 2
return {
"knob": "max-num-seqs",
"levels": {"low": low, "high": high},
"scale": {
"concurrency_scale": concurrency_scale,
"arrival_qps_1s_p95": arrival_p95,
"request_rate": request_rate,
"trace_max_concurrency": study.trace.max_concurrency,
},
"normalized_levels": {
"low": round(low / concurrency_scale, 4),
"high": round(high / concurrency_scale, 4),
},
"scale_rule": (
"low=round_up_to_8(0.75*C), high=round_up_to_8(3.0*C), "
"where C=max(arrival_qps_1s_p95, request_rate, 1)."
),
}
def _mbt_axis(window_summary: dict[str, Any]) -> dict[str, Any]:
prompt_p95 = _float_value(window_summary.get("prompt_tokens_p95"))
prompt_p99 = _float_value(window_summary.get("prompt_tokens_p99"))
prompt_scale = prompt_p95 if prompt_p95 > 0 else max(prompt_p99, 1.0)
high_cap = 32768
low = _round_up_to_multiple(int(math.ceil(prompt_scale)), 1024)
high = _round_up_to_multiple(int(math.ceil(4.0 * prompt_scale)), 1024)
high = min(high, high_cap)
notes: list[str] = []
if low >= high:
low = max(1024, _round_down_to_multiple(high // 2, 1024))
notes.append(
"low was reduced below prompt_scale because the default high cap would collapse the axis."
)
return {
"knob": "max-num-batched-tokens",
"levels": {"low": low, "high": high},
"scale": {
"prompt_scale": prompt_scale,
"prompt_tokens_p95": prompt_p95,
"prompt_tokens_p99": prompt_p99,
"default_high_cap": high_cap,
},
"normalized_levels": {
"low": round(low / prompt_scale, 4),
"high": round(high / prompt_scale, 4),
},
"scale_rule": (
"low=round_up_to_1024(prompt_p95), high=min(round_up_to_1024(4*prompt_p95), 32768)."
),
"notes": notes,
}
def _corner_configs(axes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
configs: list[dict[str, Any]] = []
for tp_level, mns_level, mbt_level in itertools.product(("low", "high"), repeat=3):
tp = axes["tp"]["levels"][tp_level]
mns = axes["mns"]["levels"][mns_level]
mbt = axes["mbt"]["levels"][mbt_level]
config_id = f"tp-{tp_level}_mns-{mns_level}_mbt-{mbt_level}"
configs.append(
{
"config_id": config_id,
"levels": {
"tp": tp_level,
"mns": mns_level,
"mbt": mbt_level,
},
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": tp,
"max-num-seqs": mns,
"max-num-batched-tokens": mbt,
},
},
"normalized_coordinates": {
"mns_over_concurrency_scale": round(
mns / axes["mns"]["scale"]["concurrency_scale"], 4
),
"mbt_over_prompt_scale": round(
mbt / axes["mbt"]["scale"]["prompt_scale"], 4
),
},
}
)
return configs
def _fixed_tunable_base_flags(study: StudySpec) -> dict[str, Any]:
tunable = set(study.engine.tunable_flags)
axis = set(AXIS_KNOBS)
return {
key: value
for key, value in study.engine.base_flags.items()
if key in tunable and key not in axis
}
def _legal_tp_with_fixed_dp(study: StudySpec, *, tp: int, dp: int) -> bool:
constraints = study.engine.topology_constraints
product = tp * dp
if constraints is None:
return product <= study.hardware.gpu_count
if constraints.allowed_tp_dp_products and product not in constraints.allowed_tp_dp_products:
return False
if not constraints.allowed_tp_dp_products and product > study.hardware.gpu_count:
return False
if constraints.require_tp_dp_product_equals_gpu_count and product != study.hardware.gpu_count:
return False
return True
def _adjacent_tp_pair(values: list[int], base_tp: int) -> tuple[int, int]:
if base_tp in values:
idx = values.index(base_tp)
if idx < len(values) - 1:
return values[idx], values[idx + 1]
return values[idx - 1], values[idx]
larger = [value for value in values if value > base_tp]
if larger:
high = larger[0]
idx = values.index(high)
if idx > 0:
return values[idx - 1], high
return high, values[idx + 1]
return values[-2], values[-1]
def _int_flag(value: Any, *, default: int) -> int:
if isinstance(value, bool):
return default
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str) and value.strip():
try:
return int(value)
except ValueError:
return default
return default
def _float_value(value: Any) -> float:
if isinstance(value, bool):
return 0.0
if isinstance(value, (int, float)):
return max(0.0, float(value))
return 0.0
def _round_up_to_multiple(value: int, multiple: int) -> int:
return ((max(value, 1) + multiple - 1) // multiple) * multiple
def _round_down_to_multiple(value: int, multiple: int) -> int:
return max(multiple, (max(value, 1) // multiple) * multiple)

View File

@@ -0,0 +1,176 @@
from __future__ import annotations
import contextlib
import io
import json
import tempfile
import unittest
from pathlib import Path
from aituner.cli import main as cli_main
from aituner.interaction_matrix import build_interaction_screening_matrix
from aituner.spec import load_study_spec
from aituner.trace import load_trace_requests, summarize_window
def _write_interaction_study(tmp_path: Path) -> Path:
trace_dir = tmp_path / "trace_windows" / "traces"
trace_dir.mkdir(parents=True)
trace_path = trace_dir / "chat_w1.jsonl"
rows = []
for idx in range(24):
rows.append(
{
"request_id": f"r{idx}",
"timestamp": float(idx // 4),
"sampling_u": 0.1 + idx * 0.01,
"messages": [{"role": "user", "content": "hello"}],
"input_length": 7000 if idx < 20 else 9000,
"output_length": 64,
}
)
with trace_path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
windows_path = tmp_path / "trace_windows" / "windows.json"
windows_path.write_text(
json.dumps(
{
"u_field": "sampling_u",
"windows": [
{
"window_id": "chat_w1",
"trace_type": "chat",
"trace_file": "traces/chat_w1.jsonl",
"window_start": 0.0,
"window_end": 6.0,
}
],
}
),
encoding="utf-8",
)
study_path = tmp_path / "study.json"
study_path.write_text(
json.dumps(
{
"study_id": "interaction-study",
"hardware": {
"gpu_count": 8,
"gpu_model": "H20",
"host_candidates": ["dash1"],
},
"model": {
"model_id": "qwen",
"served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507",
},
"engine": {
"engine_name": "vllm",
"engine_version": "0.20",
"exec_path": "/usr/local/bin/vllm",
"cwd": str(tmp_path),
"host": "127.0.0.1",
"port": 8000,
"healthcheck_path": "/v1/models",
"ready_timeout_s": 30,
"request_timeout_s": 30,
"launch_args": ["serve", "/models/qwen"],
"base_envs": {},
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 2,
"gpu-memory-utilization": 0.7,
},
"tunable_envs": [],
"tunable_flags": [
"tensor-parallel-size",
"gpu-memory-utilization",
"max-num-seqs",
"max-num-batched-tokens",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_tp_dp_products": [1, 2, 4, 8],
},
"python_executable": "python3",
},
"trace": {
"windows_path": str(windows_path),
"window_id": "chat_w1",
"u_field": "sampling_u",
"timestamp_field": "timestamp",
"max_concurrency": 64,
},
"slo": {
"target_pass_rate": 0.95,
"ttft_rule": {"kind": "fixed_ms", "threshold_ms": 5000},
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120},
},
"search": {
"low": 0.0,
"high": 1.0,
"tolerance": 0.01,
"max_probes": 8,
"sample_seed": 20260325,
},
"llm": {"system_prompt": "Tune it.", "max_history_trials": 8},
}
),
encoding="utf-8",
)
return study_path
class InteractionMatrixTests(unittest.TestCase):
def test_screening_matrix_uses_normalized_axis_scales(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_interaction_study(Path(tmp))
study = load_study_spec(study_path)
window, requests = load_trace_requests(study, study_spec_path=study_path)
matrix = build_interaction_screening_matrix(
study=study,
window_summary=summarize_window(requests, window),
)
self.assertEqual(len(matrix["configs"]), 8)
self.assertEqual(matrix["axes"]["tp"]["levels"], {"low": 2, "high": 4})
self.assertEqual(matrix["axes"]["mns"]["levels"], {"low": 8, "high": 16})
self.assertEqual(matrix["axes"]["mbt"]["levels"], {"low": 9216, "high": 32768})
self.assertEqual(
matrix["fixed_tunable_base_flags"],
{"gpu-memory-utilization": 0.7},
)
patches = [
item["config_patch"]["flag_patch"]
for item in matrix["configs"]
]
self.assertIn(
{
"tensor-parallel-size": 4,
"max-num-seqs": 16,
"max-num-batched-tokens": 32768,
},
patches,
)
def test_cli_profile_interaction_matrix_prints_json(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_interaction_study(Path(tmp))
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
rc = cli_main(["profile", "interaction-matrix", "--spec", str(study_path)])
self.assertEqual(rc, 0)
payload = json.loads(stdout.getvalue())
self.assertEqual(payload["matrix_kind"], "low_high_factorial_screening")
self.assertEqual(payload["source"]["window_id"], "chat_w1")
self.assertEqual(len(payload["configs"]), 8)
if __name__ == "__main__":
unittest.main()