Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchpress/config/jobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@
- '--warmup-timeout-buffer={warmup_timeout_buffer}'
- '--test-timeout-buffer={test_timeout_buffer}'
- '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}'
- '--poll-interval={poll_interval}'
vars:
- 'num_servers=0'
- 'memsize=0.5'
Expand All @@ -1043,7 +1044,8 @@
- 'num_client_threads=0'
- 'warmup_timeout_buffer=0'
- 'test_timeout_buffer=0'
- 'postprocessing_timeout_buffer=50'
- 'postprocessing_timeout_buffer=60'
- 'poll_interval=0.2'
hooks:
- hook: copymove
options:
Expand Down
File renamed without changes.
7 changes: 7 additions & 0 deletions packages/tao_bench/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str
default=60,
help="extra time buffer for server to complete postprocessing in seconds",
)
server_parser.add_argument(
"--poll-interval",
type=float,
default=0,
help="poll interval in seconds for process completion detection; "
+ "if > 0, use polling mechanism instead of fixed timeout",
)
server_parser.add_argument("--real", action="store_true", help="for real")

return get_opt_strings(server_parser)
Expand Down
4 changes: 1 addition & 3 deletions packages/tao_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

import args_utils

# Add parent directory to path to import diagnosis_utils
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
from diagnosis_utils import check_port_available, DiagnosisRecorder

sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common"))
import breakdown_utils
from diagnosis_utils import check_port_available, DiagnosisRecorder


BENCHPRESS_ROOT = pathlib.Path(os.path.abspath(__file__)).parents[2]
Expand Down
221 changes: 200 additions & 21 deletions packages/tao_bench/run_autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
import subprocess

import sys
import time
from datetime import datetime
from parser import TaoBenchParser

import args_utils

# Add parent directory to path to import diagnosis_utils
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common"))
from diagnosis_utils import DiagnosisRecorder


Expand Down Expand Up @@ -335,30 +336,130 @@ def run_server(args):
procs.append(p)
# wait for servers to finish - add extra time to make sure
# post-processing will finish
timeout = (
args_utils.get_warmup_time(args)
+ args.test_time
+ args.timeout_buffer
+ args.postprocessing_timeout_buffer
)
for p in procs:
try:
(out, err) = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
# Kill the entire process group
if args.poll_interval > 0:
# Use intelligent process polling instead of fixed timeout
# First wait for base timeout (warmup + test + timeout_buffer)
base_timeout = (
args_utils.get_warmup_time(args) + args.test_time + args.timeout_buffer
)

print(f"Waiting {base_timeout}s for processes to complete normally...")
time.sleep(base_timeout)

# Poll for additional postprocessing_timeout_buffer time
# Check if log files are still being written to
print(
f"Polling for up to {args.postprocessing_timeout_buffer}s for processes to finish writing output..."
)
start_time = time.time()

# Track file sizes to detect when processes stop writing
# Dict: logpath -> (last_size, stable_count)
file_states = {server[2]: (0, 0) for server in servers}
stable_threshold = (
3 # Number of consecutive stable checks before considering done
)

while time.time() - start_time < args.postprocessing_timeout_buffer:
all_stable = True

for server in servers:
logpath = server[2]
try:
# Flush the file buffer to ensure size is up-to-date
server[1].flush()
os.fsync(server[1].fileno())

current_size = os.path.getsize(logpath)
last_size, stable_count = file_states[logpath]

if current_size == last_size and current_size != 0:
# File size hasn't changed, increment stable count
stable_count += 1
file_states[logpath] = (current_size, stable_count)

if stable_count < stable_threshold:
all_stable = False
else:
# File is still growing or is zero, reset stable count
file_states[logpath] = (current_size, 0)
all_stable = False

except (OSError, ValueError):
# File might be closed or inaccessible, consider it stable
pass

if all_stable:
elapsed = time.time() - start_time
print(f"All log files stable after {elapsed:.2f}s of polling")
break

time.sleep(args.poll_interval)
else:
# Timeout reached - kill any remaining processes
total_time = base_timeout + args.postprocessing_timeout_buffer
print(
f"Timeout reached after {total_time}s (base: {base_timeout}s + polling: {args.postprocessing_timeout_buffer}s), killing remaining processes"
)
for p in procs:
if p.poll() is None: # Process still running
try:
os.killpg(os.getpgid(p.pid), 9)
except (ProcessLookupError, PermissionError):
pass

# Ensure all processes are collected and output is flushed
for p in procs:
try:
os.killpg(os.getpgid(p.pid), 9)
except ProcessLookupError:
pass # Process already terminated
(out, err) = p.communicate()
finally:
# Ensure cleanup even if process completed successfully
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
pass

# Explicitly flush all log file handles to ensure data is written to disk
for server in servers:
try:
server[1].flush() # Flush the file buffer
os.fsync(server[1].fileno()) # Force OS to write to disk
except (OSError, ValueError):
pass # Handle already closed files or invalid file descriptors

# Final cleanup to ensure process groups are terminated
for p in procs:
try:
os.killpg(os.getpgid(p.pid), 9)
except (ProcessLookupError, PermissionError):
pass # Process already terminated or we don't have permission
else:
# Original behavior with fixed timeout
timeout = (
args_utils.get_warmup_time(args)
+ args.test_time
+ args.timeout_buffer
+ args.postprocessing_timeout_buffer
)

for p in procs:
try:
(out, err) = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
# Kill the entire process group
try:
os.killpg(os.getpgid(p.pid), 9)
except ProcessLookupError:
pass # Process already terminated
(out, err) = p.communicate()
finally:
# Ensure cleanup even if process completed successfully
try:
os.killpg(os.getpgid(p.pid), 9)
except (ProcessLookupError, PermissionError):
pass # Process already terminated or we don't have permission
for server in servers:
server[1].close()

# Initialize diagnosis recorder for detailed logging
recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT))

# parse results
results = []
overall = {
Expand All @@ -375,14 +476,92 @@ def run_server(args):
overall["latency(ms)"] = latency
overall["bandwidth"] = bandwidth

# Diagnose log files and parsing
for i in range(args.num_servers):
logpath = servers[i][2]
returncode = procs[i].returncode

# Check if log file exists and get size
if not os.path.exists(logpath):
recorder.record_failure(
benchmark="tao_bench",
error_type="log_file_missing",
reason=f"Log file does not exist: {logpath}",
metadata={"server_index": i, "logpath": logpath},
)
print(f"ERROR: Log file does not exist for server {i}: {logpath}")
continue

log_size = os.path.getsize(logpath)
print(
f"Server {i}: Log file size = {log_size} bytes, return code = {returncode}"
)

if log_size == 0:
recorder.record_failure(
benchmark="tao_bench",
error_type="empty_log_file",
reason=f"Log file is empty: {logpath}",
metadata={
"server_index": i,
"logpath": logpath,
"returncode": returncode,
},
)
print(f"ERROR: Log file is empty for server {i}: {logpath}")
continue

# Parse the log file
with open(logpath, "r") as log:
log_content = log.read()
log.seek(0) # Reset to beginning for parser

# Show first few lines for debugging
preview_lines = log_content.split("\n")[:5]
print(f"Server {i} log preview (first 5 lines):")
for line in preview_lines:
print(f" {line}")

parser = TaoBenchParser(f"server_{i}.csv")
res = parser.parse(log, None, procs[i].returncode)
if "role" in res and res["role"] == "server":
res = parser.parse(log, None, returncode)

# Diagnose parser results
print(
f"Server {i} parsed result: role={res.get('role', 'UNKNOWN')}, "
f"fast_qps={res.get('fast_qps', 0)}, slow_qps={res.get('slow_qps', 0)}, "
f"num_data_points={res.get('num_data_points', 0)}"
)

if "role" not in res:
recorder.record_failure(
benchmark="tao_bench",
error_type="missing_role_in_result",
reason=f"Parser result missing 'role' field for server {i}",
metadata={
"server_index": i,
"logpath": logpath,
"result": str(res),
},
)
print(f"ERROR: Parser result missing 'role' field for server {i}")
elif res["role"] != "server":
recorder.record_failure(
benchmark="tao_bench",
error_type="incorrect_role",
reason=f"Parser result has incorrect role: {res['role']} (expected 'server')",
metadata={
"server_index": i,
"logpath": logpath,
"role": res["role"],
},
)
print(
f"ERROR: Parser result has role '{res['role']}' instead of 'server' for server {i}"
)
else:
# Valid server result
results.append(res)
recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT))
print(f"Server {i}: Successfully parsed and added to results")
recorder.merge_failure_to_results(
results_dict=overall,
)
Expand Down
4 changes: 4 additions & 0 deletions packages/tao_bench/run_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1):
if hasattr(args, "num_slow_threads") and args.num_slow_threads > 0:
script_args["--num-slow-threads"] = args.num_slow_threads

# Add poll_interval if specified
if hasattr(args, "poll_interval") and args.poll_interval > 0:
script_args["--poll-interval"] = args.poll_interval

cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"]

for argname, argval in script_args.items():
Expand Down