Add interaction screening matrix generator
This commit is contained in:
@@ -18,6 +18,7 @@ from .harness import (
|
||||
build_harness_guided_proposal,
|
||||
build_harness_stop_proposal,
|
||||
)
|
||||
from .interaction_matrix import build_interaction_screening_matrix
|
||||
from .job import append_job, build_trial_job
|
||||
from .lca import (
|
||||
build_study_workload_profile,
|
||||
@@ -1169,6 +1170,18 @@ def cmd_profile_similarity(args: argparse.Namespace) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_profile_interaction_matrix(args: argparse.Namespace) -> int:
|
||||
spec_path = Path(args.spec).resolve()
|
||||
study = _load_profile_study_spec(spec_path)
|
||||
window, requests = load_trace_requests(study, study_spec_path=spec_path)
|
||||
matrix = build_interaction_screening_matrix(
|
||||
study=study,
|
||||
window_summary=summarize_window(requests, window),
|
||||
)
|
||||
print(json.dumps(matrix, ensure_ascii=False, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="AITuner CLI")
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
@@ -1290,6 +1303,10 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
)
|
||||
profile_similarity.set_defaults(func=cmd_profile_similarity)
|
||||
|
||||
profile_interaction = profile_sub.add_parser("interaction-matrix")
|
||||
profile_interaction.add_argument("--spec", required=True)
|
||||
profile_interaction.set_defaults(func=cmd_profile_interaction_matrix)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
|
||||
414
src/aituner/interaction_matrix.py
Normal file
414
src/aituner/interaction_matrix.py
Normal file
@@ -0,0 +1,414 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import itertools
|
||||
import math
|
||||
from typing import Any
|
||||
|
||||
from .spec import SpecError, StudySpec
|
||||
|
||||
|
||||
AXIS_KNOBS = (
|
||||
"tensor-parallel-size",
|
||||
"max-num-seqs",
|
||||
"max-num-batched-tokens",
|
||||
)
|
||||
|
||||
|
||||
def build_interaction_screening_matrix(
|
||||
*,
|
||||
study: StudySpec,
|
||||
window_summary: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
"""Build the low/high screening matrix for knob-interaction experiments.
|
||||
|
||||
This does not launch any run. It only materializes a reviewable set of fixed
|
||||
config patches for a two-level factorial screen over TP, MNS, and MBT.
|
||||
"""
|
||||
|
||||
_require_axis_knobs(study)
|
||||
tp_axis = _tp_axis(study)
|
||||
mns_axis = _mns_axis(study, window_summary)
|
||||
mbt_axis = _mbt_axis(window_summary)
|
||||
axes = {
|
||||
"tp": tp_axis,
|
||||
"mns": mns_axis,
|
||||
"mbt": mbt_axis,
|
||||
}
|
||||
configs = _corner_configs(axes)
|
||||
return {
|
||||
"schema_version": 1,
|
||||
"matrix_kind": "low_high_factorial_screening",
|
||||
"study_id": study.study_id,
|
||||
"objective": (
|
||||
"Estimate whether TP, max-num-seqs, and max-num-batched-tokens "
|
||||
"have non-separable effects on feasible request_rate_per_gpu."
|
||||
),
|
||||
"source": {
|
||||
"window_id": study.trace.window_id,
|
||||
"request_mode": study.trace.request_mode,
|
||||
"engine_name": study.engine.engine_name,
|
||||
"engine_version": study.engine.engine_version,
|
||||
"hardware_gpu_count": study.hardware.gpu_count,
|
||||
},
|
||||
"window_summary": {
|
||||
key: window_summary.get(key)
|
||||
for key in (
|
||||
"request_count",
|
||||
"duration_s",
|
||||
"request_rate",
|
||||
"prompt_tokens_p50",
|
||||
"prompt_tokens_p95",
|
||||
"prompt_tokens_p99",
|
||||
"arrival_qps_1s_p50",
|
||||
"arrival_qps_1s_p95",
|
||||
"arrival_burst_ratio_p95_to_mean",
|
||||
)
|
||||
},
|
||||
"axes": axes,
|
||||
"fixed_tunable_base_flags": _fixed_tunable_base_flags(study),
|
||||
"configs": configs,
|
||||
"recommended_repeats": [
|
||||
{
|
||||
"config_id": configs[0]["config_id"],
|
||||
"reason": "Estimate low-capacity corner noise and baseline drift.",
|
||||
},
|
||||
{
|
||||
"config_id": configs[-1]["config_id"],
|
||||
"reason": "Estimate high-capacity joint-corner noise.",
|
||||
},
|
||||
],
|
||||
"analysis_plan": {
|
||||
"primary_metric": "max feasible request_rate_per_gpu at target pass_rate",
|
||||
"plot_1": (
|
||||
"2x2 MNS/MBT corner heatmaps faceted by TP low/high, colored by "
|
||||
"request_rate_per_gpu and hatched when infeasible."
|
||||
),
|
||||
"plot_2": (
|
||||
"Conditional-effect lines: MNS low/high on x-axis, one line per "
|
||||
"TP, faceted by MBT low/high."
|
||||
),
|
||||
"plot_3": (
|
||||
"Interaction-contrast bars for TPxMNS, TPxMBT, MNSxMBT, and "
|
||||
"TPxMNSxMBT normalized by the low/low/low corner."
|
||||
),
|
||||
},
|
||||
"review_notes": [
|
||||
"This matrix is for hypothesis testing only; it does not use LLM or harness tuning.",
|
||||
"All non-axis tunable base flags remain fixed and are listed in fixed_tunable_base_flags.",
|
||||
"If a high level is launch-unsafe in a real run, lower it to the largest launch-safe value and keep the blocked corner in the report.",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _require_axis_knobs(study: StudySpec) -> None:
|
||||
tunable = set(study.engine.tunable_flags)
|
||||
missing = [knob for knob in AXIS_KNOBS if knob not in tunable]
|
||||
if missing:
|
||||
raise SpecError(
|
||||
"interaction screening requires tunable flags: "
|
||||
+ ", ".join(AXIS_KNOBS)
|
||||
+ f". Missing: {', '.join(missing)}."
|
||||
)
|
||||
|
||||
|
||||
def _tp_axis(study: StudySpec) -> dict[str, Any]:
|
||||
base_flags = study.engine.base_flags
|
||||
base_tp = _int_flag(base_flags.get("tensor-parallel-size"), default=1)
|
||||
base_dp = _int_flag(base_flags.get("data-parallel-size"), default=1)
|
||||
constraints = study.engine.topology_constraints
|
||||
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
|
||||
raw_values = constraints.allowed_tensor_parallel_sizes
|
||||
else:
|
||||
raw_values = [1, 2, 4, 8]
|
||||
legal = [
|
||||
int(value)
|
||||
for value in sorted(set(raw_values))
|
||||
if _legal_tp_with_fixed_dp(study, tp=int(value), dp=base_dp)
|
||||
]
|
||||
if len(legal) < 2:
|
||||
if "data-parallel-size" not in set(study.engine.tunable_flags):
|
||||
raise SpecError(
|
||||
"interaction screening needs at least two legal TP levels with fixed "
|
||||
f"data-parallel-size={base_dp}. Legal levels: {legal}."
|
||||
)
|
||||
topologies = _legal_topologies(study)
|
||||
if len(topologies) < 2:
|
||||
raise SpecError(
|
||||
"interaction screening needs at least two legal TP/DP topologies. "
|
||||
f"Legal topologies: {topologies}."
|
||||
)
|
||||
low_topology, high_topology = _adjacent_topology_pair(topologies, base_tp)
|
||||
return {
|
||||
"knob": "topology",
|
||||
"primary_knob": "tensor-parallel-size",
|
||||
"levels": {"low": low_topology, "high": high_topology},
|
||||
"level_labels": {
|
||||
"low": _topology_label(low_topology),
|
||||
"high": _topology_label(high_topology),
|
||||
},
|
||||
"all_legal_topologies": topologies,
|
||||
"scale_rule": (
|
||||
"Fixed-DP TP levels were unavailable, so choose an adjacent legal "
|
||||
"TP/DP redistribution around the base TP."
|
||||
),
|
||||
"base_value": {
|
||||
"tensor-parallel-size": base_tp,
|
||||
"data-parallel-size": base_dp,
|
||||
},
|
||||
}
|
||||
low_tp, high_tp = _adjacent_tp_pair(legal, base_tp)
|
||||
low = {"tensor-parallel-size": low_tp}
|
||||
high = {"tensor-parallel-size": high_tp}
|
||||
if base_dp != 1:
|
||||
low["data-parallel-size"] = base_dp
|
||||
high["data-parallel-size"] = base_dp
|
||||
return {
|
||||
"knob": "tensor-parallel-size",
|
||||
"levels": {"low": low, "high": high},
|
||||
"level_labels": {"low": _topology_label(low), "high": _topology_label(high)},
|
||||
"all_legal_values_with_fixed_dp": legal,
|
||||
"fixed_data_parallel_size": base_dp,
|
||||
"scale_rule": (
|
||||
"Choose an adjacent legal TP pair around the base TP when possible; "
|
||||
"otherwise use the nearest legal pair. DP is fixed to the base value."
|
||||
),
|
||||
"base_value": base_tp,
|
||||
}
|
||||
|
||||
|
||||
def _mns_axis(study: StudySpec, window_summary: dict[str, Any]) -> dict[str, Any]:
|
||||
arrival_p95 = _float_value(window_summary.get("arrival_qps_1s_p95"))
|
||||
request_rate = _float_value(window_summary.get("request_rate"))
|
||||
concurrency_scale = max(arrival_p95, request_rate, 1.0)
|
||||
low = _round_up_to_multiple(int(math.ceil(0.75 * concurrency_scale)), 8)
|
||||
high = _round_up_to_multiple(int(math.ceil(3.0 * concurrency_scale)), 8)
|
||||
if high <= low:
|
||||
high = low * 2
|
||||
return {
|
||||
"knob": "max-num-seqs",
|
||||
"levels": {"low": low, "high": high},
|
||||
"scale": {
|
||||
"concurrency_scale": concurrency_scale,
|
||||
"arrival_qps_1s_p95": arrival_p95,
|
||||
"request_rate": request_rate,
|
||||
"trace_max_concurrency": study.trace.max_concurrency,
|
||||
},
|
||||
"normalized_levels": {
|
||||
"low": round(low / concurrency_scale, 4),
|
||||
"high": round(high / concurrency_scale, 4),
|
||||
},
|
||||
"scale_rule": (
|
||||
"low=round_up_to_8(0.75*C), high=round_up_to_8(3.0*C), "
|
||||
"where C=max(arrival_qps_1s_p95, request_rate, 1)."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _mbt_axis(window_summary: dict[str, Any]) -> dict[str, Any]:
|
||||
prompt_p95 = _float_value(window_summary.get("prompt_tokens_p95"))
|
||||
prompt_p99 = _float_value(window_summary.get("prompt_tokens_p99"))
|
||||
prompt_scale = prompt_p95 if prompt_p95 > 0 else max(prompt_p99, 1.0)
|
||||
high_cap = 32768
|
||||
low = _round_up_to_multiple(int(math.ceil(prompt_scale)), 1024)
|
||||
high = _round_up_to_multiple(int(math.ceil(4.0 * prompt_scale)), 1024)
|
||||
high = min(high, high_cap)
|
||||
notes: list[str] = []
|
||||
if low >= high:
|
||||
low = max(1024, _round_down_to_multiple(high // 2, 1024))
|
||||
notes.append(
|
||||
"low was reduced below prompt_scale because the default high cap would collapse the axis."
|
||||
)
|
||||
return {
|
||||
"knob": "max-num-batched-tokens",
|
||||
"levels": {"low": low, "high": high},
|
||||
"scale": {
|
||||
"prompt_scale": prompt_scale,
|
||||
"prompt_tokens_p95": prompt_p95,
|
||||
"prompt_tokens_p99": prompt_p99,
|
||||
"default_high_cap": high_cap,
|
||||
},
|
||||
"normalized_levels": {
|
||||
"low": round(low / prompt_scale, 4),
|
||||
"high": round(high / prompt_scale, 4),
|
||||
},
|
||||
"scale_rule": (
|
||||
"low=round_up_to_1024(prompt_p95), high=min(round_up_to_1024(4*prompt_p95), 32768)."
|
||||
),
|
||||
"notes": notes,
|
||||
}
|
||||
|
||||
|
||||
def _corner_configs(axes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
configs: list[dict[str, Any]] = []
|
||||
for tp_level, mns_level, mbt_level in itertools.product(("low", "high"), repeat=3):
|
||||
topology_patch = dict(axes["tp"]["levels"][tp_level])
|
||||
mns = axes["mns"]["levels"][mns_level]
|
||||
mbt = axes["mbt"]["levels"][mbt_level]
|
||||
config_id = f"tp-{tp_level}_mns-{mns_level}_mbt-{mbt_level}"
|
||||
flag_patch = {
|
||||
**topology_patch,
|
||||
"max-num-seqs": mns,
|
||||
"max-num-batched-tokens": mbt,
|
||||
}
|
||||
configs.append(
|
||||
{
|
||||
"config_id": config_id,
|
||||
"levels": {
|
||||
"tp": tp_level,
|
||||
"mns": mns_level,
|
||||
"mbt": mbt_level,
|
||||
},
|
||||
"config_patch": {
|
||||
"env_patch": {},
|
||||
"flag_patch": flag_patch,
|
||||
},
|
||||
"normalized_coordinates": {
|
||||
"topology": axes["tp"]["level_labels"][tp_level],
|
||||
"mns_over_concurrency_scale": round(
|
||||
mns / axes["mns"]["scale"]["concurrency_scale"], 4
|
||||
),
|
||||
"mbt_over_prompt_scale": round(
|
||||
mbt / axes["mbt"]["scale"]["prompt_scale"], 4
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
return configs
|
||||
|
||||
|
||||
def _fixed_tunable_base_flags(study: StudySpec) -> dict[str, Any]:
|
||||
tunable = set(study.engine.tunable_flags)
|
||||
axis = set(AXIS_KNOBS)
|
||||
return {
|
||||
key: value
|
||||
for key, value in study.engine.base_flags.items()
|
||||
if key in tunable and key not in axis
|
||||
}
|
||||
|
||||
|
||||
def _legal_tp_with_fixed_dp(study: StudySpec, *, tp: int, dp: int) -> bool:
|
||||
return _legal_topology(study, tp=tp, dp=dp)
|
||||
|
||||
|
||||
def _legal_topologies(study: StudySpec) -> list[dict[str, int]]:
|
||||
constraints = study.engine.topology_constraints
|
||||
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
|
||||
tp_values = constraints.allowed_tensor_parallel_sizes
|
||||
else:
|
||||
tp_values = [1, 2, 4, 8]
|
||||
if constraints is not None and constraints.allowed_data_parallel_sizes:
|
||||
dp_values = constraints.allowed_data_parallel_sizes
|
||||
else:
|
||||
dp_values = [1]
|
||||
topologies = []
|
||||
for tp in sorted(set(int(item) for item in tp_values)):
|
||||
for dp in sorted(set(int(item) for item in dp_values)):
|
||||
if _legal_topology(study, tp=tp, dp=dp):
|
||||
topologies.append(
|
||||
{
|
||||
"tensor-parallel-size": tp,
|
||||
"data-parallel-size": dp,
|
||||
}
|
||||
)
|
||||
topologies.sort(key=lambda item: (item["tensor-parallel-size"], item["data-parallel-size"]))
|
||||
return topologies
|
||||
|
||||
|
||||
def _legal_topology(study: StudySpec, *, tp: int, dp: int) -> bool:
|
||||
constraints = study.engine.topology_constraints
|
||||
product = tp * dp
|
||||
if constraints is None:
|
||||
return product <= study.hardware.gpu_count
|
||||
if constraints.allowed_tensor_parallel_sizes and tp not in constraints.allowed_tensor_parallel_sizes:
|
||||
return False
|
||||
if constraints.allowed_data_parallel_sizes and dp not in constraints.allowed_data_parallel_sizes:
|
||||
return False
|
||||
if constraints.allowed_tp_dp_products and product not in constraints.allowed_tp_dp_products:
|
||||
return False
|
||||
if not constraints.allowed_tp_dp_products and product > study.hardware.gpu_count:
|
||||
return False
|
||||
if constraints.require_tp_dp_product_equals_gpu_count and product != study.hardware.gpu_count:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _adjacent_tp_pair(values: list[int], base_tp: int) -> tuple[int, int]:
|
||||
if base_tp in values:
|
||||
idx = values.index(base_tp)
|
||||
if idx < len(values) - 1:
|
||||
return values[idx], values[idx + 1]
|
||||
return values[idx - 1], values[idx]
|
||||
larger = [value for value in values if value > base_tp]
|
||||
if larger:
|
||||
high = larger[0]
|
||||
idx = values.index(high)
|
||||
if idx > 0:
|
||||
return values[idx - 1], high
|
||||
return high, values[idx + 1]
|
||||
return values[-2], values[-1]
|
||||
|
||||
|
||||
def _adjacent_topology_pair(
|
||||
topologies: list[dict[str, int]],
|
||||
base_tp: int,
|
||||
) -> tuple[dict[str, int], dict[str, int]]:
|
||||
by_tp = []
|
||||
seen: set[int] = set()
|
||||
for item in topologies:
|
||||
tp = item["tensor-parallel-size"]
|
||||
if tp in seen:
|
||||
continue
|
||||
seen.add(tp)
|
||||
by_tp.append(item)
|
||||
if len(by_tp) < 2:
|
||||
return topologies[0], topologies[-1]
|
||||
tp_values = [item["tensor-parallel-size"] for item in by_tp]
|
||||
if base_tp in tp_values:
|
||||
idx = tp_values.index(base_tp)
|
||||
if idx < len(by_tp) - 1:
|
||||
return by_tp[idx], by_tp[idx + 1]
|
||||
return by_tp[idx - 1], by_tp[idx]
|
||||
larger = [idx for idx, tp in enumerate(tp_values) if tp > base_tp]
|
||||
if larger:
|
||||
idx = larger[0]
|
||||
if idx > 0:
|
||||
return by_tp[idx - 1], by_tp[idx]
|
||||
return by_tp[idx], by_tp[idx + 1]
|
||||
return by_tp[-2], by_tp[-1]
|
||||
|
||||
|
||||
def _topology_label(topology: dict[str, int]) -> str:
|
||||
tp = topology.get("tensor-parallel-size")
|
||||
dp = topology.get("data-parallel-size", 1)
|
||||
return f"TP{tp}/DP{dp}"
|
||||
|
||||
|
||||
def _int_flag(value: Any, *, default: int) -> int:
|
||||
if isinstance(value, bool):
|
||||
return default
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
if isinstance(value, float) and value.is_integer():
|
||||
return int(value)
|
||||
if isinstance(value, str) and value.strip():
|
||||
try:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
return default
|
||||
return default
|
||||
|
||||
|
||||
def _float_value(value: Any) -> float:
|
||||
if isinstance(value, bool):
|
||||
return 0.0
|
||||
if isinstance(value, (int, float)):
|
||||
return max(0.0, float(value))
|
||||
return 0.0
|
||||
|
||||
|
||||
def _round_up_to_multiple(value: int, multiple: int) -> int:
|
||||
return ((max(value, 1) + multiple - 1) // multiple) * multiple
|
||||
|
||||
|
||||
def _round_down_to_multiple(value: int, multiple: int) -> int:
|
||||
return max(multiple, (max(value, 1) // multiple) * multiple)
|
||||
220
tests/test_interaction_matrix.py
Normal file
220
tests/test_interaction_matrix.py
Normal file
@@ -0,0 +1,220 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from aituner.cli import main as cli_main
|
||||
from aituner.interaction_matrix import build_interaction_screening_matrix
|
||||
from aituner.spec import load_study_spec
|
||||
from aituner.trace import load_trace_requests, summarize_window
|
||||
|
||||
|
||||
def _write_interaction_study(tmp_path: Path) -> Path:
|
||||
trace_dir = tmp_path / "trace_windows" / "traces"
|
||||
trace_dir.mkdir(parents=True)
|
||||
trace_path = trace_dir / "chat_w1.jsonl"
|
||||
rows = []
|
||||
for idx in range(24):
|
||||
rows.append(
|
||||
{
|
||||
"request_id": f"r{idx}",
|
||||
"timestamp": float(idx // 4),
|
||||
"sampling_u": 0.1 + idx * 0.01,
|
||||
"messages": [{"role": "user", "content": "hello"}],
|
||||
"input_length": 7000 if idx < 20 else 9000,
|
||||
"output_length": 64,
|
||||
}
|
||||
)
|
||||
with trace_path.open("w", encoding="utf-8") as handle:
|
||||
for row in rows:
|
||||
handle.write(json.dumps(row) + "\n")
|
||||
|
||||
windows_path = tmp_path / "trace_windows" / "windows.json"
|
||||
windows_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"u_field": "sampling_u",
|
||||
"windows": [
|
||||
{
|
||||
"window_id": "chat_w1",
|
||||
"trace_type": "chat",
|
||||
"trace_file": "traces/chat_w1.jsonl",
|
||||
"window_start": 0.0,
|
||||
"window_end": 6.0,
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
study_path = tmp_path / "study.json"
|
||||
study_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"study_id": "interaction-study",
|
||||
"hardware": {
|
||||
"gpu_count": 8,
|
||||
"gpu_model": "H20",
|
||||
"host_candidates": ["dash1"],
|
||||
},
|
||||
"model": {
|
||||
"model_id": "qwen",
|
||||
"served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507",
|
||||
},
|
||||
"engine": {
|
||||
"engine_name": "vllm",
|
||||
"engine_version": "0.20",
|
||||
"exec_path": "/usr/local/bin/vllm",
|
||||
"cwd": str(tmp_path),
|
||||
"host": "127.0.0.1",
|
||||
"port": 8000,
|
||||
"healthcheck_path": "/v1/models",
|
||||
"ready_timeout_s": 30,
|
||||
"request_timeout_s": 30,
|
||||
"launch_args": ["serve", "/models/qwen"],
|
||||
"base_envs": {},
|
||||
"base_flags": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 8000,
|
||||
"tensor-parallel-size": 2,
|
||||
"gpu-memory-utilization": 0.7,
|
||||
},
|
||||
"tunable_envs": [],
|
||||
"tunable_flags": [
|
||||
"tensor-parallel-size",
|
||||
"gpu-memory-utilization",
|
||||
"max-num-seqs",
|
||||
"max-num-batched-tokens",
|
||||
],
|
||||
"topology_constraints": {
|
||||
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
||||
"allowed_tp_dp_products": [1, 2, 4, 8],
|
||||
},
|
||||
"python_executable": "python3",
|
||||
},
|
||||
"trace": {
|
||||
"windows_path": str(windows_path),
|
||||
"window_id": "chat_w1",
|
||||
"u_field": "sampling_u",
|
||||
"timestamp_field": "timestamp",
|
||||
"max_concurrency": 64,
|
||||
},
|
||||
"slo": {
|
||||
"target_pass_rate": 0.95,
|
||||
"ttft_rule": {"kind": "fixed_ms", "threshold_ms": 5000},
|
||||
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120},
|
||||
},
|
||||
"search": {
|
||||
"low": 0.0,
|
||||
"high": 1.0,
|
||||
"tolerance": 0.01,
|
||||
"max_probes": 8,
|
||||
"sample_seed": 20260325,
|
||||
},
|
||||
"llm": {"system_prompt": "Tune it.", "max_history_trials": 8},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
return study_path
|
||||
|
||||
|
||||
class InteractionMatrixTests(unittest.TestCase):
|
||||
def test_screening_matrix_uses_normalized_axis_scales(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
study_path = _write_interaction_study(Path(tmp))
|
||||
study = load_study_spec(study_path)
|
||||
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
||||
|
||||
matrix = build_interaction_screening_matrix(
|
||||
study=study,
|
||||
window_summary=summarize_window(requests, window),
|
||||
)
|
||||
|
||||
self.assertEqual(len(matrix["configs"]), 8)
|
||||
self.assertEqual(
|
||||
matrix["axes"]["tp"]["levels"],
|
||||
{
|
||||
"low": {"tensor-parallel-size": 2},
|
||||
"high": {"tensor-parallel-size": 4},
|
||||
},
|
||||
)
|
||||
self.assertEqual(matrix["axes"]["mns"]["levels"], {"low": 8, "high": 16})
|
||||
self.assertEqual(matrix["axes"]["mbt"]["levels"], {"low": 9216, "high": 32768})
|
||||
self.assertEqual(
|
||||
matrix["fixed_tunable_base_flags"],
|
||||
{"gpu-memory-utilization": 0.7},
|
||||
)
|
||||
patches = [
|
||||
item["config_patch"]["flag_patch"]
|
||||
for item in matrix["configs"]
|
||||
]
|
||||
self.assertIn(
|
||||
{
|
||||
"tensor-parallel-size": 4,
|
||||
"max-num-seqs": 16,
|
||||
"max-num-batched-tokens": 32768,
|
||||
},
|
||||
patches,
|
||||
)
|
||||
|
||||
def test_screening_matrix_can_use_tp_dp_redistribution(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
study_path = _write_interaction_study(Path(tmp))
|
||||
payload = json.loads(study_path.read_text(encoding="utf-8"))
|
||||
payload["engine"]["base_flags"]["data-parallel-size"] = 2
|
||||
payload["engine"]["tunable_flags"].append("data-parallel-size")
|
||||
payload["engine"]["topology_constraints"] = {
|
||||
"require_tp_dp_product_equals_gpu_count": True,
|
||||
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
||||
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
||||
}
|
||||
study_path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
study = load_study_spec(study_path)
|
||||
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
||||
|
||||
matrix = build_interaction_screening_matrix(
|
||||
study=study,
|
||||
window_summary=summarize_window(requests, window),
|
||||
)
|
||||
|
||||
self.assertEqual(matrix["axes"]["tp"]["knob"], "topology")
|
||||
self.assertEqual(
|
||||
matrix["axes"]["tp"]["levels"],
|
||||
{
|
||||
"low": {"tensor-parallel-size": 2, "data-parallel-size": 4},
|
||||
"high": {"tensor-parallel-size": 4, "data-parallel-size": 2},
|
||||
},
|
||||
)
|
||||
self.assertIn(
|
||||
{
|
||||
"tensor-parallel-size": 4,
|
||||
"data-parallel-size": 2,
|
||||
"max-num-seqs": 16,
|
||||
"max-num-batched-tokens": 32768,
|
||||
},
|
||||
[item["config_patch"]["flag_patch"] for item in matrix["configs"]],
|
||||
)
|
||||
|
||||
def test_cli_profile_interaction_matrix_prints_json(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
study_path = _write_interaction_study(Path(tmp))
|
||||
stdout = io.StringIO()
|
||||
|
||||
with contextlib.redirect_stdout(stdout):
|
||||
rc = cli_main(["profile", "interaction-matrix", "--spec", str(study_path)])
|
||||
|
||||
self.assertEqual(rc, 0)
|
||||
payload = json.loads(stdout.getvalue())
|
||||
self.assertEqual(payload["matrix_kind"], "low_high_factorial_screening")
|
||||
self.assertEqual(payload["source"]["window_id"], "chat_w1")
|
||||
self.assertEqual(len(payload["configs"]), 8)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user