756 lines
30 KiB
Python
756 lines
30 KiB
Python
import argparse
|
|
import asyncio
|
|
import json
|
|
import queue
|
|
import random
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import numpy as np
|
|
import requests
|
|
from tqdm.asyncio import tqdm
|
|
|
|
from sglang.bench_serving import RequestFuncOutput
|
|
from sglang.benchmark.datasets.random import sample_random_requests
|
|
from sglang.benchmark.utils import get_tokenizer
|
|
from sglang.test.kits.cache_hit_kit import (
|
|
async_request_openai_chat_completions,
|
|
async_request_sglang_generate,
|
|
gen_payload,
|
|
gen_payload_openai,
|
|
)
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description="Script to benchmark concurrent requests to a server."
|
|
)
|
|
parser.add_argument(
|
|
"--num-clients",
|
|
type=int,
|
|
default=256,
|
|
help="Number of concurrent clients",
|
|
)
|
|
parser.add_argument(
|
|
"--max-parallel",
|
|
type=int,
|
|
default=128,
|
|
help="Maximum number of parallel requests",
|
|
)
|
|
parser.add_argument(
|
|
"--request-length",
|
|
type=int,
|
|
default=512,
|
|
help="Length of each new request",
|
|
)
|
|
parser.add_argument(
|
|
"--output-length",
|
|
type=int,
|
|
default=64,
|
|
help="Length of each output",
|
|
)
|
|
parser.add_argument(
|
|
"--num-rounds",
|
|
type=int,
|
|
default=5,
|
|
help="Number of rounds per client",
|
|
)
|
|
parser.add_argument(
|
|
"--distribution",
|
|
type=str,
|
|
default="poisson",
|
|
choices=["poisson", "uniform"],
|
|
help="Distribution type for request intervals (poisson or uniform)",
|
|
)
|
|
parser.add_argument(
|
|
"--request-rate",
|
|
type=float,
|
|
default=1.0,
|
|
help="Average number of requests per second",
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
type=str,
|
|
default="localhost",
|
|
help="Server hostname or IP (default: localhost)",
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
type=int,
|
|
default=30000,
|
|
help="Server port (default: 30000)",
|
|
)
|
|
parser.add_argument(
|
|
"--model-path",
|
|
type=str,
|
|
default="meta-llama/Llama-3.1-8B-Instruct",
|
|
help="model path compatible with Hugging Face Transformers",
|
|
)
|
|
parser.add_argument(
|
|
"--dataset-path",
|
|
type=str,
|
|
default="",
|
|
help="local dataset to sample tokens from",
|
|
)
|
|
parser.add_argument(
|
|
"--log-file",
|
|
type=str,
|
|
default="performance_metrics.jsonl",
|
|
help="File to log performance metrics",
|
|
)
|
|
parser.add_argument(
|
|
"--disable-auto-run",
|
|
action="store_true",
|
|
help="If set, disable automatically testing with a range of request rates.",
|
|
)
|
|
parser.add_argument(
|
|
"--disable-random-sample",
|
|
action="store_true",
|
|
help="If set, disable random sampling of requests from the ShareGPT dataset.",
|
|
)
|
|
parser.add_argument(
|
|
"--enable-round-barrier",
|
|
action="store_true",
|
|
help="If set, only send i-th turn requests after all (i-1)-th turn requests finished.",
|
|
)
|
|
parser.add_argument(
|
|
"--sub-question-input-length",
|
|
type=int,
|
|
default=0,
|
|
help="Length of the sub question input for each request, if set 0 use request_length",
|
|
)
|
|
parser.add_argument(
|
|
"--ready-queue-policy",
|
|
type=str,
|
|
default="random",
|
|
help="Policy for popping requests from the ready queue (random or fifo)",
|
|
)
|
|
parser.add_argument(
|
|
"--tag",
|
|
type=str,
|
|
default="",
|
|
help="Tag of a certain run in the log file",
|
|
)
|
|
parser.add_argument(
|
|
"--min-rounds",
|
|
type=int,
|
|
default=0,
|
|
help="Min rounds per client (0 = use --num-rounds)",
|
|
)
|
|
parser.add_argument(
|
|
"--max-rounds",
|
|
type=int,
|
|
default=0,
|
|
help="Max rounds per client (0 = use --num-rounds)",
|
|
)
|
|
parser.add_argument(
|
|
"--range-ratio",
|
|
type=float,
|
|
default=1.0,
|
|
help="Length variation ratio for prompts and outputs (1.0 = no variation, 0.5 = 50%% variation)",
|
|
)
|
|
parser.add_argument("--seed", type=int, default=1, help="The random seed.")
|
|
parser.add_argument(
|
|
"--lora-path",
|
|
type=str,
|
|
default="",
|
|
help="String of LoRA path. Currently we only support benchmarking on a single LoRA adaptor.",
|
|
)
|
|
parser.add_argument(
|
|
"--api-format",
|
|
type=str,
|
|
default="sglang",
|
|
choices=["sglang", "openai"],
|
|
help="API format to use: 'sglang' for native /generate endpoint, "
|
|
"'openai' for OpenAI-compatible /v1/chat/completions endpoint.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def log_to_jsonl_file(data, file_path="performance_metrics.jsonl", tag=""):
|
|
"""Append the data with a timestamp and tag to the specified JSONL file."""
|
|
timestamped_data = {"timestamp": datetime.now().isoformat(), "tag": tag, **data}
|
|
try:
|
|
with open(file_path, "a") as file:
|
|
file.write(
|
|
json.dumps(timestamped_data) + "\n"
|
|
) # Write as a single line in JSONL format
|
|
except IOError as e:
|
|
print(f"Error writing to JSONL file: {e}")
|
|
|
|
|
|
class ReadyQueue:
|
|
"""
|
|
Thread-safe queue that can pop requests in different orders based on given policy.
|
|
"""
|
|
|
|
def __init__(self, init_requests=None, policy="random"):
|
|
self.lock = threading.Lock()
|
|
self.requests = init_requests or []
|
|
self.policy = policy
|
|
|
|
def append(self, item):
|
|
with self.lock:
|
|
self.requests.append(item)
|
|
|
|
def pop(self):
|
|
with self.lock:
|
|
if not self.requests:
|
|
return None
|
|
if self.policy == "random":
|
|
index = random.randrange(len(self.requests))
|
|
return self.requests.pop(index)
|
|
elif self.policy == "fifo":
|
|
return self.requests.pop(0)
|
|
else:
|
|
# todo, varying thinking time of clients
|
|
raise ValueError(f"{self.policy} not implemented")
|
|
|
|
|
|
class WorkloadGenerator:
|
|
def __init__(self, args):
|
|
self.api_format = args.api_format
|
|
self.model_path = args.model_path
|
|
|
|
# Construct the base URL and select request/payload functions
|
|
if self.api_format == "openai":
|
|
self.url = f"http://{args.host}:{args.port}/v1/chat/completions"
|
|
self.request_func = async_request_openai_chat_completions
|
|
else:
|
|
self.url = f"http://{args.host}:{args.port}/generate"
|
|
self.request_func = async_request_sglang_generate
|
|
|
|
self.tokenizer = get_tokenizer(args.model_path)
|
|
self.distribution = args.distribution
|
|
self.request_rate = args.request_rate
|
|
self.start_time = None
|
|
self.finished_time = None
|
|
self.lora_path = args.lora_path
|
|
|
|
self.sent_requests = 0
|
|
self.completed_requests = 0
|
|
|
|
# Resolve per-client round counts
|
|
min_rounds = args.min_rounds
|
|
max_rounds = args.max_rounds
|
|
if min_rounds == 0 and max_rounds == 0:
|
|
# Backward compat: all clients use --num-rounds
|
|
min_rounds = args.num_rounds
|
|
max_rounds = args.num_rounds
|
|
elif min_rounds == 0:
|
|
min_rounds = max_rounds
|
|
elif max_rounds == 0:
|
|
max_rounds = min_rounds
|
|
if min_rounds < 1:
|
|
raise ValueError(f"--min-rounds must be >= 1, got {min_rounds}")
|
|
if min_rounds > max_rounds:
|
|
raise ValueError(
|
|
f"--min-rounds ({min_rounds}) must be <= --max-rounds ({max_rounds})"
|
|
)
|
|
|
|
self.min_rounds = min_rounds
|
|
self.max_rounds = max_rounds
|
|
|
|
if min_rounds == max_rounds:
|
|
# All clients have the same round count; skip randint to preserve random state
|
|
self.client_total_rounds = [min_rounds] * args.num_clients
|
|
else:
|
|
self.client_total_rounds = [
|
|
random.randint(min_rounds, max_rounds) for _ in range(args.num_clients)
|
|
]
|
|
|
|
# clients_per_round[r] = number of clients participating in round r
|
|
self.clients_per_round = [
|
|
sum(1 for t in self.client_total_rounds if t > r) for r in range(max_rounds)
|
|
]
|
|
self.total_requests = sum(self.client_total_rounds)
|
|
|
|
range_ratio = args.range_ratio
|
|
|
|
# Use return_text=False to get token ids instead of text
|
|
first_round_samples = sample_random_requests(
|
|
input_len=args.request_length,
|
|
output_len=args.output_length,
|
|
num_prompts=args.num_clients,
|
|
range_ratio=range_ratio,
|
|
tokenizer=self.tokenizer,
|
|
dataset_path=args.dataset_path,
|
|
random_sample=not args.disable_random_sample,
|
|
return_text=False,
|
|
)
|
|
# Store per-sample output_len for first round
|
|
first_round_output_lens = [row.output_len for row in first_round_samples]
|
|
# r.prompt is now List[int] when return_text=False
|
|
self.candidate_inputs = [list(i.prompt) for i in first_round_samples]
|
|
|
|
if args.sub_question_input_length != 0:
|
|
sub_question_input_length = args.sub_question_input_length
|
|
else:
|
|
sub_question_input_length = args.request_length
|
|
|
|
num_sub_questions = sum(max(t - 1, 0) for t in self.client_total_rounds)
|
|
|
|
self.sub_question_inputs = sample_random_requests(
|
|
input_len=sub_question_input_length,
|
|
output_len=args.output_length,
|
|
num_prompts=max(num_sub_questions, 1),
|
|
range_ratio=range_ratio,
|
|
tokenizer=self.tokenizer,
|
|
dataset_path=args.dataset_path,
|
|
random_sample=not args.disable_random_sample,
|
|
return_text=False,
|
|
)
|
|
|
|
if self.api_format == "openai":
|
|
# OpenAI mode: history is a messages list for /v1/chat/completions
|
|
initial_messages = {
|
|
i: [
|
|
{
|
|
"role": "user",
|
|
"content": self.tokenizer.decode(self.candidate_inputs[i]),
|
|
}
|
|
]
|
|
for i in range(args.num_clients)
|
|
}
|
|
init_requests = [
|
|
(
|
|
i,
|
|
gen_payload_openai(
|
|
initial_messages[i],
|
|
first_round_output_lens[i],
|
|
self.model_path,
|
|
),
|
|
)
|
|
for i in range(args.num_clients)
|
|
]
|
|
self.client_records = {
|
|
i: {
|
|
"round": 0,
|
|
"history": initial_messages[i],
|
|
"total_rounds": self.client_total_rounds[i],
|
|
}
|
|
for i in range(args.num_clients)
|
|
}
|
|
else:
|
|
# SGLang mode: history is List[int] (token ids)
|
|
init_requests = [
|
|
(
|
|
i,
|
|
gen_payload(
|
|
self.candidate_inputs[i],
|
|
first_round_output_lens[i],
|
|
args.lora_path,
|
|
),
|
|
)
|
|
for i in range(args.num_clients)
|
|
]
|
|
self.client_records = {
|
|
i: {
|
|
"round": 0,
|
|
"history": list(self.candidate_inputs[i]),
|
|
"total_rounds": self.client_total_rounds[i],
|
|
}
|
|
for i in range(args.num_clients)
|
|
}
|
|
self.ready_queue = ReadyQueue(
|
|
init_requests=init_requests, policy=args.ready_queue_policy
|
|
)
|
|
self.candidate_inputs = self.candidate_inputs[args.num_clients :]
|
|
|
|
self.response_queue = queue.Queue()
|
|
self.pbar = tqdm(total=self.total_requests)
|
|
self.performance_metrics = {
|
|
"ttft": [],
|
|
"itl": [],
|
|
"latency": [],
|
|
"prompt_len": [],
|
|
"cached_tokens": [],
|
|
"generated_len": [],
|
|
}
|
|
self.enable_round_barrier = args.enable_round_barrier
|
|
if self.enable_round_barrier:
|
|
# Add round-specific metrics while preserving the original structure
|
|
for i in range(self.max_rounds):
|
|
self.performance_metrics[f"round_{i}"] = {
|
|
"ttft": [],
|
|
"latency": [],
|
|
"prompt_len": [],
|
|
"cached_tokens": [],
|
|
"generated_len": [],
|
|
}
|
|
self.num_clients = args.num_clients
|
|
|
|
self.num_rounds = self.max_rounds
|
|
self.max_parallel = args.max_parallel
|
|
self.output_length = args.output_length
|
|
|
|
async def handle_request(self, item):
|
|
client_id, payload = item
|
|
try:
|
|
response = await self.request_func(payload, self.url, self.pbar)
|
|
if self.pbar.n == self.pbar.total:
|
|
self.finished_time = time.perf_counter()
|
|
self.response_queue.put((client_id, response))
|
|
except Exception as e:
|
|
print(f"Request failed for client {client_id}: {e}")
|
|
failed_response = RequestFuncOutput()
|
|
failed_response.success = False
|
|
failed_response.error = str(e)
|
|
self.response_queue.put((client_id, failed_response))
|
|
|
|
def request_sender(self):
|
|
async def request_loop():
|
|
while True:
|
|
if self.sent_requests - self.completed_requests < self.max_parallel:
|
|
new_request = self.ready_queue.pop()
|
|
if new_request:
|
|
asyncio.create_task(self.handle_request(new_request))
|
|
self.sent_requests += 1
|
|
else:
|
|
await asyncio.sleep(0.05)
|
|
continue
|
|
|
|
if self.pbar.n == self.pbar.total:
|
|
break
|
|
|
|
# Calculate Poisson-distributed wait time
|
|
if self.distribution == "poisson":
|
|
sleep_time = random.expovariate(self.request_rate)
|
|
elif self.distribution == "uniform":
|
|
avg_interval = (
|
|
1.0 / self.request_rate if self.request_rate > 0 else 1.0
|
|
)
|
|
sleep_time = random.uniform(0, 2 * avg_interval)
|
|
else:
|
|
raise ValueError("Invalid distribution type")
|
|
await asyncio.sleep(sleep_time) # Wait before sending the next request
|
|
|
|
# Create and run the event loop for asynchronous requests
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_until_complete(request_loop())
|
|
loop.close()
|
|
|
|
def response_handler(self):
|
|
next_round_reqs = []
|
|
current_barrier_round = 0
|
|
barrier_round_completed = 0
|
|
while True:
|
|
try:
|
|
client_id, response = self.response_queue.get(
|
|
timeout=10
|
|
) # Block until response is available
|
|
if not response.success:
|
|
print(f"Request failed for client {client_id}: {response.error}")
|
|
self.completed_requests += 1
|
|
continue
|
|
# Extend history with response
|
|
if self.api_format == "openai":
|
|
if response.generated_text:
|
|
self.client_records[client_id]["history"].append(
|
|
{"role": "assistant", "content": response.generated_text}
|
|
)
|
|
else:
|
|
self.client_records[client_id]["history"].extend(
|
|
response.output_ids
|
|
)
|
|
current_round = self.client_records[client_id]["round"]
|
|
self.client_records[client_id]["round"] += 1
|
|
self.performance_metrics["ttft"].append(response.ttft)
|
|
self.performance_metrics["itl"].extend(response.itl)
|
|
self.performance_metrics["latency"].append(response.latency)
|
|
self.performance_metrics["prompt_len"].append(response.prompt_len)
|
|
self.performance_metrics["cached_tokens"].append(response.cached_tokens)
|
|
self.performance_metrics["generated_len"].append(response.generated_len)
|
|
if self.enable_round_barrier:
|
|
self.performance_metrics[f"round_{current_round}"]["ttft"].append(
|
|
response.ttft
|
|
)
|
|
self.performance_metrics[f"round_{current_round}"][
|
|
"latency"
|
|
].append(response.latency)
|
|
self.performance_metrics[f"round_{current_round}"][
|
|
"prompt_len"
|
|
].append(response.prompt_len)
|
|
self.performance_metrics[f"round_{current_round}"][
|
|
"cached_tokens"
|
|
].append(response.cached_tokens)
|
|
self.performance_metrics[f"round_{current_round}"][
|
|
"generated_len"
|
|
].append(response.generated_len)
|
|
self.completed_requests += 1
|
|
|
|
client_total = self.client_records[client_id]["total_rounds"]
|
|
if self.client_records[client_id]["round"] < client_total:
|
|
sub_q = self.sub_question_inputs.pop()
|
|
if self.api_format == "openai":
|
|
# Append sub-question as a new user message
|
|
sub_q_text = self.tokenizer.decode(list(sub_q.prompt))
|
|
self.client_records[client_id]["history"].append(
|
|
{"role": "user", "content": sub_q_text}
|
|
)
|
|
new_req = (
|
|
client_id,
|
|
gen_payload_openai(
|
|
self.client_records[client_id]["history"],
|
|
sub_q.output_len,
|
|
self.model_path,
|
|
),
|
|
)
|
|
else:
|
|
# Append sub-question token ids to client's history
|
|
sub_q_ids = list(sub_q.prompt)
|
|
self.client_records[client_id]["history"].extend(sub_q_ids)
|
|
new_req = (
|
|
client_id,
|
|
gen_payload(
|
|
self.client_records[client_id]["history"],
|
|
sub_q.output_len,
|
|
self.lora_path,
|
|
),
|
|
)
|
|
if self.enable_round_barrier:
|
|
next_round_reqs.append(new_req)
|
|
else:
|
|
self.ready_queue.append(new_req)
|
|
|
|
# Barrier logic: release next round when all clients for
|
|
# current barrier round have completed
|
|
if (
|
|
self.enable_round_barrier
|
|
and current_barrier_round < self.max_rounds
|
|
):
|
|
barrier_round_completed += 1
|
|
expected = self.clients_per_round[current_barrier_round]
|
|
if barrier_round_completed == expected:
|
|
print(
|
|
f"\n Barrier: round {current_barrier_round} complete "
|
|
f"({expected} clients), releasing {len(next_round_reqs)} "
|
|
f"requests for round {current_barrier_round + 1}"
|
|
)
|
|
self._send_heartbeat(input_len=100, output_len=100)
|
|
time.sleep(10)
|
|
for req in next_round_reqs:
|
|
self.ready_queue.append(req)
|
|
next_round_reqs = []
|
|
current_barrier_round += 1
|
|
barrier_round_completed = 0
|
|
except queue.Empty:
|
|
if self.pbar.n == self.pbar.total:
|
|
break
|
|
except ValueError as e:
|
|
print(f"Error processing response for client {client_id}: {e}")
|
|
continue
|
|
|
|
def _send_heartbeat(self, input_len=100, output_len=20):
|
|
"""Send a small heartbeat request to the server."""
|
|
heartbeat_input = [1] * input_len
|
|
payload = gen_payload(heartbeat_input, output_len, self.lora_path)
|
|
try:
|
|
requests.post(self.url, json=payload, timeout=30)
|
|
except Exception as e:
|
|
print(f"Heartbeat request failed: {e}")
|
|
|
|
def run(self):
|
|
request_thread = threading.Thread(target=self.request_sender, daemon=True)
|
|
response_thread = threading.Thread(target=self.response_handler, daemon=True)
|
|
|
|
self.start_time = time.perf_counter()
|
|
request_thread.start()
|
|
response_thread.start()
|
|
|
|
request_thread.join()
|
|
response_thread.join()
|
|
self.pbar.close()
|
|
|
|
duration = self.finished_time - self.start_time
|
|
sorted_ttft = sorted(self.performance_metrics["ttft"])
|
|
sorted_latency = sorted(self.performance_metrics["latency"])
|
|
sorted_itl = sorted(self.performance_metrics["itl"])
|
|
sorted_prompt_len = sorted(self.performance_metrics["prompt_len"])
|
|
sorted_output_len = sorted(self.performance_metrics["generated_len"])
|
|
|
|
def percentile(sorted_vals, q):
|
|
if not sorted_vals:
|
|
return 0.0
|
|
idx = int(q * len(sorted_vals))
|
|
if idx >= len(sorted_vals):
|
|
idx = len(sorted_vals) - 1
|
|
return sorted_vals[idx]
|
|
|
|
def max_or_zero(sorted_vals):
|
|
return sorted_vals[-1] if sorted_vals else 0.0
|
|
|
|
performance_data = {
|
|
"summary": {
|
|
"total_requests": len(self.performance_metrics["ttft"]),
|
|
"request_rate": self.request_rate,
|
|
"average_prompt_len": (
|
|
sum(self.performance_metrics["prompt_len"])
|
|
/ len(self.performance_metrics["prompt_len"])
|
|
if self.performance_metrics["prompt_len"]
|
|
else 0.0
|
|
),
|
|
"average_output_len": (
|
|
sum(self.performance_metrics["generated_len"])
|
|
/ len(self.performance_metrics["generated_len"])
|
|
if self.performance_metrics["generated_len"]
|
|
else 0.0
|
|
),
|
|
"p90_prompt_len": percentile(sorted_prompt_len, 0.9),
|
|
"p99_prompt_len": percentile(sorted_prompt_len, 0.99),
|
|
"p90_output_len": percentile(sorted_output_len, 0.9),
|
|
"p99_output_len": percentile(sorted_output_len, 0.99),
|
|
"average_ttft": sum(self.performance_metrics["ttft"])
|
|
/ len(self.performance_metrics["ttft"]),
|
|
"p90_ttft": percentile(sorted_ttft, 0.9),
|
|
"p99_ttft": percentile(sorted_ttft, 0.99),
|
|
"median_ttft": percentile(sorted_ttft, 0.5),
|
|
"max_ttft": max_or_zero(sorted_ttft),
|
|
"average_itl": (
|
|
sum(self.performance_metrics["itl"])
|
|
/ len(self.performance_metrics["itl"])
|
|
if self.performance_metrics["itl"]
|
|
else 0.0
|
|
),
|
|
"p90_itl": percentile(sorted_itl, 0.9),
|
|
"p99_itl": percentile(sorted_itl, 0.99),
|
|
"median_itl": percentile(sorted_itl, 0.5),
|
|
"max_itl": max_or_zero(sorted_itl),
|
|
"average_latency": sum(self.performance_metrics["latency"])
|
|
/ len(self.performance_metrics["latency"]),
|
|
"p90_latency": percentile(sorted_latency, 0.9),
|
|
"p99_latency": percentile(sorted_latency, 0.99),
|
|
"median_latency": percentile(sorted_latency, 0.5),
|
|
"max_latency": max_or_zero(sorted_latency),
|
|
"input_token_throughput": sum(self.performance_metrics["prompt_len"])
|
|
/ duration,
|
|
"output_token_throughput": sum(
|
|
self.performance_metrics["generated_len"]
|
|
)
|
|
/ duration,
|
|
"throughput": self.pbar.total / duration,
|
|
"cache_hit_rate": (
|
|
0
|
|
if sum(self.performance_metrics["prompt_len"]) == 0
|
|
else sum(self.performance_metrics["cached_tokens"])
|
|
/ sum(self.performance_metrics["prompt_len"])
|
|
),
|
|
},
|
|
}
|
|
if self.enable_round_barrier:
|
|
performance_data["round"] = {}
|
|
for round_num in range(self.num_rounds):
|
|
round_key = f"round_{round_num}"
|
|
round_metrics = self.performance_metrics[round_key]
|
|
performance_data["round"][round_key] = {
|
|
"average_ttft": (
|
|
sum(round_metrics["ttft"]) / len(round_metrics["ttft"])
|
|
if round_metrics["ttft"]
|
|
else 0
|
|
),
|
|
"cache_hit_rate": (
|
|
0
|
|
if sum(round_metrics["prompt_len"]) == 0
|
|
else sum(round_metrics["cached_tokens"])
|
|
/ sum(round_metrics["prompt_len"])
|
|
),
|
|
"request_count": len(round_metrics["ttft"]),
|
|
}
|
|
print("All requests completed")
|
|
print("Performance metrics summary:")
|
|
print(
|
|
f" Total requests: {performance_data['summary']['total_requests']} at {performance_data['summary']['request_rate']} requests per second"
|
|
)
|
|
print(
|
|
f" Average Prompt Length: {performance_data['summary']['average_prompt_len']:.2f} tokens"
|
|
)
|
|
print(
|
|
f" Average Output Length: {performance_data['summary']['average_output_len']:.2f} tokens"
|
|
)
|
|
print(
|
|
f" P90 Prompt Length: {performance_data['summary']['p90_prompt_len']:.0f} tokens"
|
|
)
|
|
print(
|
|
f" P99 Prompt Length: {performance_data['summary']['p99_prompt_len']:.0f} tokens"
|
|
)
|
|
print(
|
|
f" P90 Output Length: {performance_data['summary']['p90_output_len']:.0f} tokens"
|
|
)
|
|
print(
|
|
f" P99 Output Length: {performance_data['summary']['p99_output_len']:.0f} tokens"
|
|
)
|
|
print(f" Average TTFT: {performance_data['summary']['average_ttft']:.2f}")
|
|
print(f" P90 TTFT: {performance_data['summary']['p90_ttft']:.2f}")
|
|
print(f" P99 TTFT: {performance_data['summary']['p99_ttft']:.2f}")
|
|
print(f" Median TTFT: {performance_data['summary']['median_ttft']:.2f}")
|
|
print(f" Max TTFT: {performance_data['summary']['max_ttft']:.2f}")
|
|
print(f" Average ITL: {performance_data['summary']['average_itl']:.4f}")
|
|
print(f" P90 ITL: {performance_data['summary']['p90_itl']:.4f}")
|
|
print(f" P99 ITL: {performance_data['summary']['p99_itl']:.4f}")
|
|
print(f" Median ITL: {performance_data['summary']['median_itl']:.4f}")
|
|
print(f" Max ITL: {performance_data['summary']['max_itl']:.4f}")
|
|
print(
|
|
f" Average latency: {performance_data['summary']['average_latency']:.2f}"
|
|
)
|
|
print(f" P90 latency: {performance_data['summary']['p90_latency']:.2f}")
|
|
print(f" P99 latency: {performance_data['summary']['p99_latency']:.2f}")
|
|
print(f" Median latency: {performance_data['summary']['median_latency']:.2f}")
|
|
print(f" Max latency: {performance_data['summary']['max_latency']:.2f}")
|
|
print(
|
|
f" Input token throughput: {performance_data['summary']['input_token_throughput']:.2f} tokens per second"
|
|
)
|
|
print(
|
|
f" Output token throughput: {performance_data['summary']['output_token_throughput']:.2f} tokens per second"
|
|
)
|
|
print(
|
|
f" Request Throughput: {performance_data['summary']['throughput']:.2f} requests per second"
|
|
)
|
|
print(f" Cache Hit Rate: {performance_data['summary']['cache_hit_rate']:.6f}")
|
|
|
|
if self.enable_round_barrier:
|
|
# Print round-basedsummary
|
|
print("Per-round metrics:")
|
|
if "round" in performance_data:
|
|
for round_num in range(self.num_rounds):
|
|
round_key = f"round_{round_num}"
|
|
if round_key in performance_data["round"]:
|
|
round_data = performance_data["round"][round_key]
|
|
avg_ttft = round_data["average_ttft"]
|
|
cache_hit_rate = round_data["cache_hit_rate"]
|
|
request_count = round_data["request_count"]
|
|
clients_in_round = self.clients_per_round[round_num]
|
|
print(
|
|
f" Round {round_num}: Average TTFT = {avg_ttft:.2f}s, "
|
|
f"Cache Hit Rate = {cache_hit_rate:.6f} "
|
|
f"({request_count} requests, "
|
|
f"{clients_in_round} clients)"
|
|
)
|
|
else:
|
|
print(f" Round {round_num}: No requests completed")
|
|
|
|
return performance_data
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
flush_cache_url = f"http://{args.host}:{args.port}/flush_cache"
|
|
|
|
random.seed(args.seed)
|
|
np.random.seed(args.seed)
|
|
|
|
if args.disable_auto_run:
|
|
print("Running with specified request rate...")
|
|
request_rates = [args.request_rate]
|
|
else:
|
|
print("Auto-running with different request rates...")
|
|
request_rates = [16, 14, 12, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
|
|
|
|
for rate in request_rates:
|
|
args.request_rate = rate
|
|
requests.post(flush_cache_url)
|
|
time.sleep(1)
|
|
performance_data = WorkloadGenerator(args).run()
|
|
log_to_jsonl_file(performance_data, args.log_file, tag=args.tag)
|