Skip to content

Commit 25fc6f1

Browse files
DCPerf mini: Adding a polling mechanism for shrinking Tao-bench (#308)
Summary: Pull Request resolved: #308 Shrink tao bench more by adding a polling mechanism Reviewed By: charles-typ Differential Revision: D86997642
1 parent 7c44c03 commit 25fc6f1

File tree

6 files changed

+215
-25
lines changed

6 files changed

+215
-25
lines changed

benchpress/config/jobs.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,7 @@
10241024
- '--warmup-timeout-buffer={warmup_timeout_buffer}'
10251025
- '--test-timeout-buffer={test_timeout_buffer}'
10261026
- '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}'
1027+
- '--poll-interval={poll_interval}'
10271028
vars:
10281029
- 'num_servers=0'
10291030
- 'memsize=0.5'
@@ -1043,7 +1044,8 @@
10431044
- 'num_client_threads=0'
10441045
- 'warmup_timeout_buffer=0'
10451046
- 'test_timeout_buffer=0'
1046-
- 'postprocessing_timeout_buffer=50'
1047+
- 'postprocessing_timeout_buffer=60'
1048+
- 'poll_interval=0.2'
10471049
hooks:
10481050
- hook: copymove
10491051
options:
File renamed without changes.

packages/tao_bench/args_utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,13 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str
159159
default=60,
160160
help="extra time buffer for server to complete postprocessing in seconds",
161161
)
162+
server_parser.add_argument(
163+
"--poll-interval",
164+
type=float,
165+
default=0,
166+
help="poll interval in seconds for process completion detection; "
167+
+ "if > 0, use polling mechanism instead of fixed timeout",
168+
)
162169
server_parser.add_argument("--real", action="store_true", help="for real")
163170

164171
return get_opt_strings(server_parser)

packages/tao_bench/run.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
import args_utils
1919

20-
# Add parent directory to path to import diagnosis_utils
21-
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
22-
from diagnosis_utils import check_port_available, DiagnosisRecorder
2320

2421
sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common"))
2522
import breakdown_utils
23+
from diagnosis_utils import check_port_available, DiagnosisRecorder
2624

2725

2826
BENCHPRESS_ROOT = pathlib.Path(os.path.abspath(__file__)).parents[2]

packages/tao_bench/run_autoscale.py

Lines changed: 200 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
import subprocess
1515

1616
import sys
17+
import time
1718
from datetime import datetime
1819
from parser import TaoBenchParser
1920

2021
import args_utils
2122

2223
# Add parent directory to path to import diagnosis_utils
23-
sys.path.insert(0, str(pathlib.Path(__file__).parent.parent))
24+
sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common"))
2425
from diagnosis_utils import DiagnosisRecorder
2526

2627

@@ -335,30 +336,130 @@ def run_server(args):
335336
procs.append(p)
336337
# wait for servers to finish - add extra time to make sure
337338
# post-processing will finish
338-
timeout = (
339-
args_utils.get_warmup_time(args)
340-
+ args.test_time
341-
+ args.timeout_buffer
342-
+ args.postprocessing_timeout_buffer
343-
)
344-
for p in procs:
345-
try:
346-
(out, err) = p.communicate(timeout=timeout)
347-
except subprocess.TimeoutExpired:
348-
# Kill the entire process group
339+
if args.poll_interval > 0:
340+
# Use intelligent process polling instead of fixed timeout
341+
# First wait for base timeout (warmup + test + timeout_buffer)
342+
base_timeout = (
343+
args_utils.get_warmup_time(args) + args.test_time + args.timeout_buffer
344+
)
345+
346+
print(f"Waiting {base_timeout}s for processes to complete normally...")
347+
time.sleep(base_timeout)
348+
349+
# Poll for additional postprocessing_timeout_buffer time
350+
# Check if log files are still being written to
351+
print(
352+
f"Polling for up to {args.postprocessing_timeout_buffer}s for processes to finish writing output..."
353+
)
354+
start_time = time.time()
355+
356+
# Track file sizes to detect when processes stop writing
357+
# Dict: logpath -> (last_size, stable_count)
358+
file_states = {server[2]: (0, 0) for server in servers}
359+
stable_threshold = (
360+
3 # Number of consecutive stable checks before considering done
361+
)
362+
363+
while time.time() - start_time < args.postprocessing_timeout_buffer:
364+
all_stable = True
365+
366+
for server in servers:
367+
logpath = server[2]
368+
try:
369+
# Flush the file buffer to ensure size is up-to-date
370+
server[1].flush()
371+
os.fsync(server[1].fileno())
372+
373+
current_size = os.path.getsize(logpath)
374+
last_size, stable_count = file_states[logpath]
375+
376+
if current_size == last_size and current_size != 0:
377+
# File size hasn't changed, increment stable count
378+
stable_count += 1
379+
file_states[logpath] = (current_size, stable_count)
380+
381+
if stable_count < stable_threshold:
382+
all_stable = False
383+
else:
384+
# File is still growing or is zero, reset stable count
385+
file_states[logpath] = (current_size, 0)
386+
all_stable = False
387+
388+
except (OSError, ValueError):
389+
# File might be closed or inaccessible, consider it stable
390+
pass
391+
392+
if all_stable:
393+
elapsed = time.time() - start_time
394+
print(f"All log files stable after {elapsed:.2f}s of polling")
395+
break
396+
397+
time.sleep(args.poll_interval)
398+
else:
399+
# Timeout reached - kill any remaining processes
400+
total_time = base_timeout + args.postprocessing_timeout_buffer
401+
print(
402+
f"Timeout reached after {total_time}s (base: {base_timeout}s + polling: {args.postprocessing_timeout_buffer}s), killing remaining processes"
403+
)
404+
for p in procs:
405+
if p.poll() is None: # Process still running
406+
try:
407+
os.killpg(os.getpgid(p.pid), 9)
408+
except (ProcessLookupError, PermissionError):
409+
pass
410+
411+
# Ensure all processes are collected and output is flushed
412+
for p in procs:
349413
try:
350-
os.killpg(os.getpgid(p.pid), 9)
351-
except ProcessLookupError:
352-
pass # Process already terminated
353-
(out, err) = p.communicate()
354-
finally:
355-
# Ensure cleanup even if process completed successfully
414+
p.communicate(timeout=1)
415+
except subprocess.TimeoutExpired:
416+
pass
417+
418+
# Explicitly flush all log file handles to ensure data is written to disk
419+
for server in servers:
420+
try:
421+
server[1].flush() # Flush the file buffer
422+
os.fsync(server[1].fileno()) # Force OS to write to disk
423+
except (OSError, ValueError):
424+
pass # Handle already closed files or invalid file descriptors
425+
426+
# Final cleanup to ensure process groups are terminated
427+
for p in procs:
356428
try:
357429
os.killpg(os.getpgid(p.pid), 9)
358430
except (ProcessLookupError, PermissionError):
359431
pass # Process already terminated or we don't have permission
432+
else:
433+
# Original behavior with fixed timeout
434+
timeout = (
435+
args_utils.get_warmup_time(args)
436+
+ args.test_time
437+
+ args.timeout_buffer
438+
+ args.postprocessing_timeout_buffer
439+
)
440+
441+
for p in procs:
442+
try:
443+
(out, err) = p.communicate(timeout=timeout)
444+
except subprocess.TimeoutExpired:
445+
# Kill the entire process group
446+
try:
447+
os.killpg(os.getpgid(p.pid), 9)
448+
except ProcessLookupError:
449+
pass # Process already terminated
450+
(out, err) = p.communicate()
451+
finally:
452+
# Ensure cleanup even if process completed successfully
453+
try:
454+
os.killpg(os.getpgid(p.pid), 9)
455+
except (ProcessLookupError, PermissionError):
456+
pass # Process already terminated or we don't have permission
360457
for server in servers:
361458
server[1].close()
459+
460+
# Initialize diagnosis recorder for detailed logging
461+
recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT))
462+
362463
# parse results
363464
results = []
364465
overall = {
@@ -375,14 +476,92 @@ def run_server(args):
375476
overall["latency(ms)"] = latency
376477
overall["bandwidth"] = bandwidth
377478

479+
# Diagnose log files and parsing
378480
for i in range(args.num_servers):
379481
logpath = servers[i][2]
482+
returncode = procs[i].returncode
483+
484+
# Check if log file exists and get size
485+
if not os.path.exists(logpath):
486+
recorder.record_failure(
487+
benchmark="tao_bench",
488+
error_type="log_file_missing",
489+
reason=f"Log file does not exist: {logpath}",
490+
metadata={"server_index": i, "logpath": logpath},
491+
)
492+
print(f"ERROR: Log file does not exist for server {i}: {logpath}")
493+
continue
494+
495+
log_size = os.path.getsize(logpath)
496+
print(
497+
f"Server {i}: Log file size = {log_size} bytes, return code = {returncode}"
498+
)
499+
500+
if log_size == 0:
501+
recorder.record_failure(
502+
benchmark="tao_bench",
503+
error_type="empty_log_file",
504+
reason=f"Log file is empty: {logpath}",
505+
metadata={
506+
"server_index": i,
507+
"logpath": logpath,
508+
"returncode": returncode,
509+
},
510+
)
511+
print(f"ERROR: Log file is empty for server {i}: {logpath}")
512+
continue
513+
514+
# Parse the log file
380515
with open(logpath, "r") as log:
516+
log_content = log.read()
517+
log.seek(0) # Reset to beginning for parser
518+
519+
# Show first few lines for debugging
520+
preview_lines = log_content.split("\n")[:5]
521+
print(f"Server {i} log preview (first 5 lines):")
522+
for line in preview_lines:
523+
print(f" {line}")
524+
381525
parser = TaoBenchParser(f"server_{i}.csv")
382-
res = parser.parse(log, None, procs[i].returncode)
383-
if "role" in res and res["role"] == "server":
526+
res = parser.parse(log, None, returncode)
527+
528+
# Diagnose parser results
529+
print(
530+
f"Server {i} parsed result: role={res.get('role', 'UNKNOWN')}, "
531+
f"fast_qps={res.get('fast_qps', 0)}, slow_qps={res.get('slow_qps', 0)}, "
532+
f"num_data_points={res.get('num_data_points', 0)}"
533+
)
534+
535+
if "role" not in res:
536+
recorder.record_failure(
537+
benchmark="tao_bench",
538+
error_type="missing_role_in_result",
539+
reason=f"Parser result missing 'role' field for server {i}",
540+
metadata={
541+
"server_index": i,
542+
"logpath": logpath,
543+
"result": str(res),
544+
},
545+
)
546+
print(f"ERROR: Parser result missing 'role' field for server {i}")
547+
elif res["role"] != "server":
548+
recorder.record_failure(
549+
benchmark="tao_bench",
550+
error_type="incorrect_role",
551+
reason=f"Parser result has incorrect role: {res['role']} (expected 'server')",
552+
metadata={
553+
"server_index": i,
554+
"logpath": logpath,
555+
"role": res["role"],
556+
},
557+
)
558+
print(
559+
f"ERROR: Parser result has role '{res['role']}' instead of 'server' for server {i}"
560+
)
561+
else:
562+
# Valid server result
384563
results.append(res)
385-
recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT))
564+
print(f"Server {i}: Successfully parsed and added to results")
386565
recorder.merge_failure_to_results(
387566
results_dict=overall,
388567
)

packages/tao_bench/run_standalone.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1):
183183
if hasattr(args, "num_slow_threads") and args.num_slow_threads > 0:
184184
script_args["--num-slow-threads"] = args.num_slow_threads
185185

186+
# Add poll_interval if specified
187+
if hasattr(args, "poll_interval") and args.poll_interval > 0:
188+
script_args["--poll-interval"] = args.poll_interval
189+
186190
cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"]
187191

188192
for argname, argval in script_args.items():

0 commit comments

Comments
 (0)