diff --git a/benchpress/config/jobs.yml b/benchpress/config/jobs.yml index f058dc526..5232d3566 100644 --- a/benchpress/config/jobs.yml +++ b/benchpress/config/jobs.yml @@ -1024,6 +1024,7 @@ - '--warmup-timeout-buffer={warmup_timeout_buffer}' - '--test-timeout-buffer={test_timeout_buffer}' - '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}' + - '--poll-interval={poll_interval}' vars: - 'num_servers=0' - 'memsize=0.5' @@ -1043,7 +1044,8 @@ - 'num_client_threads=0' - 'warmup_timeout_buffer=0' - 'test_timeout_buffer=0' - - 'postprocessing_timeout_buffer=50' + - 'postprocessing_timeout_buffer=60' + - 'poll_interval=0.2' hooks: - hook: copymove options: diff --git a/packages/diagnosis_utils.py b/packages/common/diagnosis_utils.py similarity index 100% rename from packages/diagnosis_utils.py rename to packages/common/diagnosis_utils.py diff --git a/packages/tao_bench/args_utils.py b/packages/tao_bench/args_utils.py index 9d1dec3bc..67c4e787a 100644 --- a/packages/tao_bench/args_utils.py +++ b/packages/tao_bench/args_utils.py @@ -159,6 +159,13 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str default=60, help="extra time buffer for server to complete postprocessing in seconds", ) + server_parser.add_argument( + "--poll-interval", + type=float, + default=0, + help="poll interval in seconds for process completion detection; " + + "if > 0, use polling mechanism instead of fixed timeout", + ) server_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(server_parser) diff --git a/packages/tao_bench/run.py b/packages/tao_bench/run.py index f68ce6bca..5cd39f39e 100755 --- a/packages/tao_bench/run.py +++ b/packages/tao_bench/run.py @@ -17,12 +17,10 @@ import args_utils -# Add parent directory to path to import diagnosis_utils -sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) -from diagnosis_utils import check_port_available, DiagnosisRecorder sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common")) import breakdown_utils +from diagnosis_utils import check_port_available, DiagnosisRecorder BENCHPRESS_ROOT = pathlib.Path(os.path.abspath(__file__)).parents[2] diff --git a/packages/tao_bench/run_autoscale.py b/packages/tao_bench/run_autoscale.py index e106135fe..1f8b63a91 100755 --- a/packages/tao_bench/run_autoscale.py +++ b/packages/tao_bench/run_autoscale.py @@ -14,13 +14,14 @@ import subprocess import sys +import time from datetime import datetime from parser import TaoBenchParser import args_utils # Add parent directory to path to import diagnosis_utils -sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) +sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common")) from diagnosis_utils import DiagnosisRecorder @@ -335,30 +336,130 @@ def run_server(args): procs.append(p) # wait for servers to finish - add extra time to make sure # post-processing will finish - timeout = ( - args_utils.get_warmup_time(args) - + args.test_time - + args.timeout_buffer - + args.postprocessing_timeout_buffer - ) - for p in procs: - try: - (out, err) = p.communicate(timeout=timeout) - except subprocess.TimeoutExpired: - # Kill the entire process group + if args.poll_interval > 0: + # Use intelligent process polling instead of fixed timeout + # First wait for base timeout (warmup + test + timeout_buffer) + base_timeout = ( + args_utils.get_warmup_time(args) + args.test_time + args.timeout_buffer + ) + + print(f"Waiting {base_timeout}s for processes to complete normally...") + time.sleep(base_timeout) + + # Poll for additional postprocessing_timeout_buffer time + # Check if log files are still being written to + print( + f"Polling for up to {args.postprocessing_timeout_buffer}s for processes to finish writing output..." + ) + start_time = time.time() + + # Track file sizes to detect when processes stop writing + # Dict: logpath -> (last_size, stable_count) + file_states = {server[2]: (0, 0) for server in servers} + stable_threshold = ( + 3 # Number of consecutive stable checks before considering done + ) + + while time.time() - start_time < args.postprocessing_timeout_buffer: + all_stable = True + + for server in servers: + logpath = server[2] + try: + # Flush the file buffer to ensure size is up-to-date + server[1].flush() + os.fsync(server[1].fileno()) + + current_size = os.path.getsize(logpath) + last_size, stable_count = file_states[logpath] + + if current_size == last_size and current_size != 0: + # File size hasn't changed, increment stable count + stable_count += 1 + file_states[logpath] = (current_size, stable_count) + + if stable_count < stable_threshold: + all_stable = False + else: + # File is still growing or is zero, reset stable count + file_states[logpath] = (current_size, 0) + all_stable = False + + except (OSError, ValueError): + # File might be closed or inaccessible, consider it stable + pass + + if all_stable: + elapsed = time.time() - start_time + print(f"All log files stable after {elapsed:.2f}s of polling") + break + + time.sleep(args.poll_interval) + else: + # Timeout reached - kill any remaining processes + total_time = base_timeout + args.postprocessing_timeout_buffer + print( + f"Timeout reached after {total_time}s (base: {base_timeout}s + polling: {args.postprocessing_timeout_buffer}s), killing remaining processes" + ) + for p in procs: + if p.poll() is None: # Process still running + try: + os.killpg(os.getpgid(p.pid), 9) + except (ProcessLookupError, PermissionError): + pass + + # Ensure all processes are collected and output is flushed + for p in procs: try: - os.killpg(os.getpgid(p.pid), 9) - except ProcessLookupError: - pass # Process already terminated - (out, err) = p.communicate() - finally: - # Ensure cleanup even if process completed successfully + p.communicate(timeout=1) + except subprocess.TimeoutExpired: + pass + + # Explicitly flush all log file handles to ensure data is written to disk + for server in servers: + try: + server[1].flush() # Flush the file buffer + os.fsync(server[1].fileno()) # Force OS to write to disk + except (OSError, ValueError): + pass # Handle already closed files or invalid file descriptors + + # Final cleanup to ensure process groups are terminated + for p in procs: try: os.killpg(os.getpgid(p.pid), 9) except (ProcessLookupError, PermissionError): pass # Process already terminated or we don't have permission + else: + # Original behavior with fixed timeout + timeout = ( + args_utils.get_warmup_time(args) + + args.test_time + + args.timeout_buffer + + args.postprocessing_timeout_buffer + ) + + for p in procs: + try: + (out, err) = p.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + # Kill the entire process group + try: + os.killpg(os.getpgid(p.pid), 9) + except ProcessLookupError: + pass # Process already terminated + (out, err) = p.communicate() + finally: + # Ensure cleanup even if process completed successfully + try: + os.killpg(os.getpgid(p.pid), 9) + except (ProcessLookupError, PermissionError): + pass # Process already terminated or we don't have permission for server in servers: server[1].close() + + # Initialize diagnosis recorder for detailed logging + recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) + # parse results results = [] overall = { @@ -375,14 +476,92 @@ def run_server(args): overall["latency(ms)"] = latency overall["bandwidth"] = bandwidth + # Diagnose log files and parsing for i in range(args.num_servers): logpath = servers[i][2] + returncode = procs[i].returncode + + # Check if log file exists and get size + if not os.path.exists(logpath): + recorder.record_failure( + benchmark="tao_bench", + error_type="log_file_missing", + reason=f"Log file does not exist: {logpath}", + metadata={"server_index": i, "logpath": logpath}, + ) + print(f"ERROR: Log file does not exist for server {i}: {logpath}") + continue + + log_size = os.path.getsize(logpath) + print( + f"Server {i}: Log file size = {log_size} bytes, return code = {returncode}" + ) + + if log_size == 0: + recorder.record_failure( + benchmark="tao_bench", + error_type="empty_log_file", + reason=f"Log file is empty: {logpath}", + metadata={ + "server_index": i, + "logpath": logpath, + "returncode": returncode, + }, + ) + print(f"ERROR: Log file is empty for server {i}: {logpath}") + continue + + # Parse the log file with open(logpath, "r") as log: + log_content = log.read() + log.seek(0) # Reset to beginning for parser + + # Show first few lines for debugging + preview_lines = log_content.split("\n")[:5] + print(f"Server {i} log preview (first 5 lines):") + for line in preview_lines: + print(f" {line}") + parser = TaoBenchParser(f"server_{i}.csv") - res = parser.parse(log, None, procs[i].returncode) - if "role" in res and res["role"] == "server": + res = parser.parse(log, None, returncode) + + # Diagnose parser results + print( + f"Server {i} parsed result: role={res.get('role', 'UNKNOWN')}, " + f"fast_qps={res.get('fast_qps', 0)}, slow_qps={res.get('slow_qps', 0)}, " + f"num_data_points={res.get('num_data_points', 0)}" + ) + + if "role" not in res: + recorder.record_failure( + benchmark="tao_bench", + error_type="missing_role_in_result", + reason=f"Parser result missing 'role' field for server {i}", + metadata={ + "server_index": i, + "logpath": logpath, + "result": str(res), + }, + ) + print(f"ERROR: Parser result missing 'role' field for server {i}") + elif res["role"] != "server": + recorder.record_failure( + benchmark="tao_bench", + error_type="incorrect_role", + reason=f"Parser result has incorrect role: {res['role']} (expected 'server')", + metadata={ + "server_index": i, + "logpath": logpath, + "role": res["role"], + }, + ) + print( + f"ERROR: Parser result has role '{res['role']}' instead of 'server' for server {i}" + ) + else: + # Valid server result results.append(res) - recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) + print(f"Server {i}: Successfully parsed and added to results") recorder.merge_failure_to_results( results_dict=overall, ) diff --git a/packages/tao_bench/run_standalone.py b/packages/tao_bench/run_standalone.py index d479c072f..54dc26ef2 100755 --- a/packages/tao_bench/run_standalone.py +++ b/packages/tao_bench/run_standalone.py @@ -183,6 +183,10 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1): if hasattr(args, "num_slow_threads") and args.num_slow_threads > 0: script_args["--num-slow-threads"] = args.num_slow_threads + # Add poll_interval if specified + if hasattr(args, "poll_interval") and args.poll_interval > 0: + script_args["--poll-interval"] = args.poll_interval + cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"] for argname, argval in script_args.items():