diff --git a/src/aituner/cli.py b/src/aituner/cli.py index 0155fc6..27c86c9 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -18,6 +18,7 @@ from .harness import ( build_harness_guided_proposal, build_harness_stop_proposal, ) +from .interaction_matrix import build_interaction_screening_matrix from .job import append_job, build_trial_job from .lca import ( build_study_workload_profile, @@ -1169,6 +1170,18 @@ def cmd_profile_similarity(args: argparse.Namespace) -> int: return 0 +def cmd_profile_interaction_matrix(args: argparse.Namespace) -> int: + spec_path = Path(args.spec).resolve() + study = _load_profile_study_spec(spec_path) + window, requests = load_trace_requests(study, study_spec_path=spec_path) + matrix = build_interaction_screening_matrix( + study=study, + window_summary=summarize_window(requests, window), + ) + print(json.dumps(matrix, ensure_ascii=False, indent=2)) + return 0 + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="AITuner CLI") subparsers = parser.add_subparsers(dest="command", required=True) @@ -1290,6 +1303,10 @@ def build_parser() -> argparse.ArgumentParser: ) profile_similarity.set_defaults(func=cmd_profile_similarity) + profile_interaction = profile_sub.add_parser("interaction-matrix") + profile_interaction.add_argument("--spec", required=True) + profile_interaction.set_defaults(func=cmd_profile_interaction_matrix) + return parser diff --git a/src/aituner/interaction_matrix.py b/src/aituner/interaction_matrix.py new file mode 100644 index 0000000..57fa1b1 --- /dev/null +++ b/src/aituner/interaction_matrix.py @@ -0,0 +1,424 @@ +from __future__ import annotations + +import itertools +import math +from typing import Any + +from .spec import SpecError, StudySpec + + +AXIS_KNOBS = ( + "tensor-parallel-size", + "max-num-seqs", + "max-num-batched-tokens", +) + + +def build_interaction_screening_matrix( + *, + study: StudySpec, + window_summary: dict[str, Any], +) -> dict[str, Any]: + """Build the low/high screening matrix for knob-interaction experiments. + + This does not launch any run. It only materializes a reviewable set of fixed + config patches for a two-level factorial screen over TP, MNS, and MBT. + """ + + _require_axis_knobs(study) + tp_axis = _tp_axis(study) + mns_axis = _mns_axis(study, window_summary) + mbt_axis = _mbt_axis(window_summary) + axes = { + "tp": tp_axis, + "mns": mns_axis, + "mbt": mbt_axis, + } + configs = _corner_configs(axes) + return { + "schema_version": 1, + "matrix_kind": "low_high_factorial_screening", + "study_id": study.study_id, + "objective": ( + "Estimate whether TP, max-num-seqs, and max-num-batched-tokens " + "have non-separable effects on feasible request_rate_per_gpu." + ), + "source": { + "window_id": study.trace.window_id, + "request_mode": study.trace.request_mode, + "engine_name": study.engine.engine_name, + "engine_version": study.engine.engine_version, + "hardware_gpu_count": study.hardware.gpu_count, + }, + "window_summary": { + key: window_summary.get(key) + for key in ( + "request_count", + "duration_s", + "request_rate", + "prompt_tokens_p50", + "prompt_tokens_p95", + "prompt_tokens_p99", + "arrival_qps_1s_p50", + "arrival_qps_1s_p95", + "arrival_burst_ratio_p95_to_mean", + ) + }, + "axes": axes, + "fixed_tunable_base_flags": _fixed_tunable_base_flags(study, axes), + "configs": configs, + "recommended_repeats": [ + { + "config_id": configs[0]["config_id"], + "reason": "Estimate low-capacity corner noise and baseline drift.", + }, + { + "config_id": configs[-1]["config_id"], + "reason": "Estimate high-capacity joint-corner noise.", + }, + ], + "analysis_plan": { + "primary_metric": "max feasible request_rate_per_gpu at target pass_rate", + "plot_1": ( + "2x2 MNS/MBT corner heatmaps faceted by TP low/high, colored by " + "request_rate_per_gpu and hatched when infeasible." + ), + "plot_2": ( + "Conditional-effect lines: MNS low/high on x-axis, one line per " + "TP, faceted by MBT low/high." + ), + "plot_3": ( + "Interaction-contrast bars for TPxMNS, TPxMBT, MNSxMBT, and " + "TPxMNSxMBT normalized by the low/low/low corner." + ), + }, + "review_notes": [ + "This matrix is for hypothesis testing only; it does not use LLM or harness tuning.", + "All non-axis tunable base flags remain fixed and are listed in fixed_tunable_base_flags.", + "If a high level is launch-unsafe in a real run, lower it to the largest launch-safe value and keep the blocked corner in the report.", + ], + } + + +def _require_axis_knobs(study: StudySpec) -> None: + tunable = set(study.engine.tunable_flags) + missing = [knob for knob in AXIS_KNOBS if knob not in tunable] + if missing: + raise SpecError( + "interaction screening requires tunable flags: " + + ", ".join(AXIS_KNOBS) + + f". Missing: {', '.join(missing)}." + ) + + +def _tp_axis(study: StudySpec) -> dict[str, Any]: + base_flags = study.engine.base_flags + base_tp = _int_flag(base_flags.get("tensor-parallel-size"), default=1) + base_dp = _int_flag(base_flags.get("data-parallel-size"), default=1) + constraints = study.engine.topology_constraints + if constraints is not None and constraints.allowed_tensor_parallel_sizes: + raw_values = constraints.allowed_tensor_parallel_sizes + else: + raw_values = [1, 2, 4, 8] + legal = [ + int(value) + for value in sorted(set(raw_values)) + if _legal_tp_with_fixed_dp(study, tp=int(value), dp=base_dp) + ] + if len(legal) < 2: + if "data-parallel-size" not in set(study.engine.tunable_flags): + raise SpecError( + "interaction screening needs at least two legal TP levels with fixed " + f"data-parallel-size={base_dp}. Legal levels: {legal}." + ) + topologies = _legal_topologies(study) + if len(topologies) < 2: + raise SpecError( + "interaction screening needs at least two legal TP/DP topologies. " + f"Legal topologies: {topologies}." + ) + low_topology, high_topology = _adjacent_topology_pair(topologies, base_tp) + return { + "knob": "topology", + "primary_knob": "tensor-parallel-size", + "levels": {"low": low_topology, "high": high_topology}, + "level_labels": { + "low": _topology_label(low_topology), + "high": _topology_label(high_topology), + }, + "all_legal_topologies": topologies, + "scale_rule": ( + "Fixed-DP TP levels were unavailable, so choose an adjacent legal " + "TP/DP redistribution around the base TP." + ), + "base_value": { + "tensor-parallel-size": base_tp, + "data-parallel-size": base_dp, + }, + } + low_tp, high_tp = _adjacent_tp_pair(legal, base_tp) + low = {"tensor-parallel-size": low_tp} + high = {"tensor-parallel-size": high_tp} + if base_dp != 1: + low["data-parallel-size"] = base_dp + high["data-parallel-size"] = base_dp + return { + "knob": "tensor-parallel-size", + "levels": {"low": low, "high": high}, + "level_labels": {"low": _topology_label(low), "high": _topology_label(high)}, + "all_legal_values_with_fixed_dp": legal, + "fixed_data_parallel_size": base_dp, + "scale_rule": ( + "Choose an adjacent legal TP pair around the base TP when possible; " + "otherwise use the nearest legal pair. DP is fixed to the base value." + ), + "base_value": base_tp, + } + + +def _mns_axis(study: StudySpec, window_summary: dict[str, Any]) -> dict[str, Any]: + arrival_p95 = _float_value(window_summary.get("arrival_qps_1s_p95")) + request_rate = _float_value(window_summary.get("request_rate")) + concurrency_scale = max(arrival_p95, request_rate, 1.0) + low = _round_up_to_multiple(int(math.ceil(0.75 * concurrency_scale)), 8) + high = _round_up_to_multiple(int(math.ceil(3.0 * concurrency_scale)), 8) + if high <= low: + high = low * 2 + return { + "knob": "max-num-seqs", + "levels": {"low": low, "high": high}, + "scale": { + "concurrency_scale": concurrency_scale, + "arrival_qps_1s_p95": arrival_p95, + "request_rate": request_rate, + "trace_max_concurrency": study.trace.max_concurrency, + }, + "normalized_levels": { + "low": round(low / concurrency_scale, 4), + "high": round(high / concurrency_scale, 4), + }, + "scale_rule": ( + "low=round_up_to_8(0.75*C), high=round_up_to_8(3.0*C), " + "where C=max(arrival_qps_1s_p95, request_rate, 1)." + ), + } + + +def _mbt_axis(window_summary: dict[str, Any]) -> dict[str, Any]: + prompt_p95 = _float_value(window_summary.get("prompt_tokens_p95")) + prompt_p99 = _float_value(window_summary.get("prompt_tokens_p99")) + prompt_scale = prompt_p95 if prompt_p95 > 0 else max(prompt_p99, 1.0) + high_cap = 32768 + low = _round_up_to_multiple(int(math.ceil(prompt_scale)), 1024) + high = _round_up_to_multiple(int(math.ceil(4.0 * prompt_scale)), 1024) + high = min(high, high_cap) + notes: list[str] = [] + if 0 < high_cap - high <= 1024: + high = high_cap + notes.append( + "high was snapped to 32768 because round_up(4*prompt_scale) was within one 1024-token step of the cap." + ) + if low >= high: + low = max(1024, _round_down_to_multiple(high // 2, 1024)) + notes.append( + "low was reduced below prompt_scale because the default high cap would collapse the axis." + ) + return { + "knob": "max-num-batched-tokens", + "levels": {"low": low, "high": high}, + "scale": { + "prompt_scale": prompt_scale, + "prompt_tokens_p95": prompt_p95, + "prompt_tokens_p99": prompt_p99, + "default_high_cap": high_cap, + }, + "normalized_levels": { + "low": round(low / prompt_scale, 4), + "high": round(high / prompt_scale, 4), + }, + "scale_rule": ( + "low=round_up_to_1024(prompt_p95), high=min(round_up_to_1024(4*prompt_p95), 32768)." + ), + "notes": notes, + } + + +def _corner_configs(axes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]: + configs: list[dict[str, Any]] = [] + for tp_level, mns_level, mbt_level in itertools.product(("low", "high"), repeat=3): + topology_patch = dict(axes["tp"]["levels"][tp_level]) + mns = axes["mns"]["levels"][mns_level] + mbt = axes["mbt"]["levels"][mbt_level] + config_id = f"tp-{tp_level}_mns-{mns_level}_mbt-{mbt_level}" + flag_patch = { + **topology_patch, + "max-num-seqs": mns, + "max-num-batched-tokens": mbt, + } + configs.append( + { + "config_id": config_id, + "levels": { + "tp": tp_level, + "mns": mns_level, + "mbt": mbt_level, + }, + "config_patch": { + "env_patch": {}, + "flag_patch": flag_patch, + }, + "normalized_coordinates": { + "topology": axes["tp"]["level_labels"][tp_level], + "mns_over_concurrency_scale": round( + mns / axes["mns"]["scale"]["concurrency_scale"], 4 + ), + "mbt_over_prompt_scale": round( + mbt / axes["mbt"]["scale"]["prompt_scale"], 4 + ), + }, + } + ) + return configs + + +def _fixed_tunable_base_flags(study: StudySpec, axes: dict[str, dict[str, Any]]) -> dict[str, Any]: + tunable = set(study.engine.tunable_flags) + axis = set(AXIS_KNOBS) + tp_levels = axes.get("tp", {}).get("levels") + if isinstance(tp_levels, dict): + for level in tp_levels.values(): + if isinstance(level, dict): + axis.update(str(key) for key in level) + return { + key: value + for key, value in study.engine.base_flags.items() + if key in tunable and key not in axis + } + + +def _legal_tp_with_fixed_dp(study: StudySpec, *, tp: int, dp: int) -> bool: + return _legal_topology(study, tp=tp, dp=dp) + + +def _legal_topologies(study: StudySpec) -> list[dict[str, int]]: + constraints = study.engine.topology_constraints + if constraints is not None and constraints.allowed_tensor_parallel_sizes: + tp_values = constraints.allowed_tensor_parallel_sizes + else: + tp_values = [1, 2, 4, 8] + if constraints is not None and constraints.allowed_data_parallel_sizes: + dp_values = constraints.allowed_data_parallel_sizes + else: + dp_values = [1] + topologies = [] + for tp in sorted(set(int(item) for item in tp_values)): + for dp in sorted(set(int(item) for item in dp_values)): + if _legal_topology(study, tp=tp, dp=dp): + topologies.append( + { + "tensor-parallel-size": tp, + "data-parallel-size": dp, + } + ) + topologies.sort(key=lambda item: (item["tensor-parallel-size"], item["data-parallel-size"])) + return topologies + + +def _legal_topology(study: StudySpec, *, tp: int, dp: int) -> bool: + constraints = study.engine.topology_constraints + product = tp * dp + if constraints is None: + return product <= study.hardware.gpu_count + if constraints.allowed_tensor_parallel_sizes and tp not in constraints.allowed_tensor_parallel_sizes: + return False + if constraints.allowed_data_parallel_sizes and dp not in constraints.allowed_data_parallel_sizes: + return False + if constraints.allowed_tp_dp_products and product not in constraints.allowed_tp_dp_products: + return False + if not constraints.allowed_tp_dp_products and product > study.hardware.gpu_count: + return False + if constraints.require_tp_dp_product_equals_gpu_count and product != study.hardware.gpu_count: + return False + return True + + +def _adjacent_tp_pair(values: list[int], base_tp: int) -> tuple[int, int]: + if base_tp in values: + idx = values.index(base_tp) + if idx < len(values) - 1: + return values[idx], values[idx + 1] + return values[idx - 1], values[idx] + larger = [value for value in values if value > base_tp] + if larger: + high = larger[0] + idx = values.index(high) + if idx > 0: + return values[idx - 1], high + return high, values[idx + 1] + return values[-2], values[-1] + + +def _adjacent_topology_pair( + topologies: list[dict[str, int]], + base_tp: int, +) -> tuple[dict[str, int], dict[str, int]]: + by_tp = [] + seen: set[int] = set() + for item in topologies: + tp = item["tensor-parallel-size"] + if tp in seen: + continue + seen.add(tp) + by_tp.append(item) + if len(by_tp) < 2: + return topologies[0], topologies[-1] + tp_values = [item["tensor-parallel-size"] for item in by_tp] + if base_tp in tp_values: + idx = tp_values.index(base_tp) + if idx < len(by_tp) - 1: + return by_tp[idx], by_tp[idx + 1] + return by_tp[idx - 1], by_tp[idx] + larger = [idx for idx, tp in enumerate(tp_values) if tp > base_tp] + if larger: + idx = larger[0] + if idx > 0: + return by_tp[idx - 1], by_tp[idx] + return by_tp[idx], by_tp[idx + 1] + return by_tp[-2], by_tp[-1] + + +def _topology_label(topology: dict[str, int]) -> str: + tp = topology.get("tensor-parallel-size") + dp = topology.get("data-parallel-size", 1) + return f"TP{tp}/DP{dp}" + + +def _int_flag(value: Any, *, default: int) -> int: + if isinstance(value, bool): + return default + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str) and value.strip(): + try: + return int(value) + except ValueError: + return default + return default + + +def _float_value(value: Any) -> float: + if isinstance(value, bool): + return 0.0 + if isinstance(value, (int, float)): + return max(0.0, float(value)) + return 0.0 + + +def _round_up_to_multiple(value: int, multiple: int) -> int: + return ((max(value, 1) + multiple - 1) // multiple) * multiple + + +def _round_down_to_multiple(value: int, multiple: int) -> int: + return max(multiple, (max(value, 1) // multiple) * multiple) diff --git a/tests/test_interaction_matrix.py b/tests/test_interaction_matrix.py new file mode 100644 index 0000000..e194e9a --- /dev/null +++ b/tests/test_interaction_matrix.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +import contextlib +import io +import json +import tempfile +import unittest +from pathlib import Path + +from aituner.cli import main as cli_main +from aituner.interaction_matrix import build_interaction_screening_matrix +from aituner.spec import load_study_spec +from aituner.trace import load_trace_requests, summarize_window + + +def _write_interaction_study(tmp_path: Path) -> Path: + trace_dir = tmp_path / "trace_windows" / "traces" + trace_dir.mkdir(parents=True) + trace_path = trace_dir / "chat_w1.jsonl" + rows = [] + for idx in range(24): + rows.append( + { + "request_id": f"r{idx}", + "timestamp": float(idx // 4), + "sampling_u": 0.1 + idx * 0.01, + "messages": [{"role": "user", "content": "hello"}], + "input_length": 7000 if idx < 20 else 9000, + "output_length": 64, + } + ) + with trace_path.open("w", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row) + "\n") + + windows_path = tmp_path / "trace_windows" / "windows.json" + windows_path.write_text( + json.dumps( + { + "u_field": "sampling_u", + "windows": [ + { + "window_id": "chat_w1", + "trace_type": "chat", + "trace_file": "traces/chat_w1.jsonl", + "window_start": 0.0, + "window_end": 6.0, + } + ], + } + ), + encoding="utf-8", + ) + + study_path = tmp_path / "study.json" + study_path.write_text( + json.dumps( + { + "study_id": "interaction-study", + "hardware": { + "gpu_count": 8, + "gpu_model": "H20", + "host_candidates": ["dash1"], + }, + "model": { + "model_id": "qwen", + "served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507", + }, + "engine": { + "engine_name": "vllm", + "engine_version": "0.20", + "exec_path": "/usr/local/bin/vllm", + "cwd": str(tmp_path), + "host": "127.0.0.1", + "port": 8000, + "healthcheck_path": "/v1/models", + "ready_timeout_s": 30, + "request_timeout_s": 30, + "launch_args": ["serve", "/models/qwen"], + "base_envs": {}, + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 2, + "gpu-memory-utilization": 0.7, + }, + "tunable_envs": [], + "tunable_flags": [ + "tensor-parallel-size", + "gpu-memory-utilization", + "max-num-seqs", + "max-num-batched-tokens", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_tp_dp_products": [1, 2, 4, 8], + }, + "python_executable": "python3", + }, + "trace": { + "windows_path": str(windows_path), + "window_id": "chat_w1", + "u_field": "sampling_u", + "timestamp_field": "timestamp", + "max_concurrency": 64, + }, + "slo": { + "target_pass_rate": 0.95, + "ttft_rule": {"kind": "fixed_ms", "threshold_ms": 5000}, + "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120}, + }, + "search": { + "low": 0.0, + "high": 1.0, + "tolerance": 0.01, + "max_probes": 8, + "sample_seed": 20260325, + }, + "llm": {"system_prompt": "Tune it.", "max_history_trials": 8}, + } + ), + encoding="utf-8", + ) + return study_path + + +class InteractionMatrixTests(unittest.TestCase): + def test_screening_matrix_uses_normalized_axis_scales(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + study_path = _write_interaction_study(Path(tmp)) + study = load_study_spec(study_path) + window, requests = load_trace_requests(study, study_spec_path=study_path) + + matrix = build_interaction_screening_matrix( + study=study, + window_summary=summarize_window(requests, window), + ) + + self.assertEqual(len(matrix["configs"]), 8) + self.assertEqual( + matrix["axes"]["tp"]["levels"], + { + "low": {"tensor-parallel-size": 2}, + "high": {"tensor-parallel-size": 4}, + }, + ) + self.assertEqual(matrix["axes"]["mns"]["levels"], {"low": 8, "high": 16}) + self.assertEqual(matrix["axes"]["mbt"]["levels"], {"low": 9216, "high": 32768}) + self.assertEqual( + matrix["fixed_tunable_base_flags"], + {"gpu-memory-utilization": 0.7}, + ) + patches = [ + item["config_patch"]["flag_patch"] + for item in matrix["configs"] + ] + self.assertIn( + { + "tensor-parallel-size": 4, + "max-num-seqs": 16, + "max-num-batched-tokens": 32768, + }, + patches, + ) + + def test_screening_matrix_can_use_tp_dp_redistribution(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + study_path = _write_interaction_study(Path(tmp)) + payload = json.loads(study_path.read_text(encoding="utf-8")) + payload["engine"]["base_flags"]["data-parallel-size"] = 2 + payload["engine"]["tunable_flags"].append("data-parallel-size") + payload["engine"]["topology_constraints"] = { + "require_tp_dp_product_equals_gpu_count": True, + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + } + study_path.write_text(json.dumps(payload), encoding="utf-8") + study = load_study_spec(study_path) + window, requests = load_trace_requests(study, study_spec_path=study_path) + + matrix = build_interaction_screening_matrix( + study=study, + window_summary=summarize_window(requests, window), + ) + + self.assertEqual(matrix["axes"]["tp"]["knob"], "topology") + self.assertEqual( + matrix["axes"]["tp"]["levels"], + { + "low": {"tensor-parallel-size": 2, "data-parallel-size": 4}, + "high": {"tensor-parallel-size": 4, "data-parallel-size": 2}, + }, + ) + self.assertNotIn("data-parallel-size", matrix["fixed_tunable_base_flags"]) + self.assertIn( + { + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "max-num-seqs": 16, + "max-num-batched-tokens": 32768, + }, + [item["config_patch"]["flag_patch"] for item in matrix["configs"]], + ) + + def test_cli_profile_interaction_matrix_prints_json(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + study_path = _write_interaction_study(Path(tmp)) + stdout = io.StringIO() + + with contextlib.redirect_stdout(stdout): + rc = cli_main(["profile", "interaction-matrix", "--spec", str(study_path)]) + + self.assertEqual(rc, 0) + payload = json.loads(stdout.getvalue()) + self.assertEqual(payload["matrix_kind"], "low_high_factorial_screening") + self.assertEqual(payload["source"]["window_id"], "chat_w1") + self.assertEqual(len(payload["configs"]), 8) + + +if __name__ == "__main__": + unittest.main()