Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multi-thread for cloud experiments and mutli-process for local ones #29

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions run_all_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import logging
import os
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool

import run_one_experiment
Expand Down Expand Up @@ -51,7 +53,7 @@ def __init__(self, benchmark, result):
self.result = result


def get_experiment_configs(
def _get_experiment_configs(
args: argparse.Namespace
) -> list[tuple[benchmarklib.Benchmark, argparse.Namespace]]:
"""Constructs a list of experiment configs based on the |BENCHMARK_DIR| and
Expand Down Expand Up @@ -205,25 +207,67 @@ def _print_experiment_results(results: list[Result]):
f'{result.result}\n')


def _execute_benchmark_experiment(
config: tuple[benchmarklib.Benchmark, argparse.Namespace]) -> Result:
"""
Executes one experiment with one benchmark |config| and returns the result.
"""
result = run_experiments(*config)
_print_experiment_result(result)
return result


def parallelize_experiments_in_threads(configs):
"""Executes experiments in a multi-threaded manner."""
futures = []
with ThreadPoolExecutor(max_workers=NUM_EXP) as executor:
for config in configs:
futures.append(executor.submit(_execute_benchmark_experiment, config))
# Stagger the thread creation.
# Avoid having a peak CPU usage at the beginning because of creating
# too many threads.
# Approx. 30s is sufficient because these threads will soon become idle
# when waiting for cloud build results.
time.sleep(30)
return [future.result() for future in as_completed(futures)]


def parallelize_experiments_in_processes(configs):
"""Executes experiments in a multi-process manner."""
results = []
with Pool(NUM_EXP) as pool:
for config in configs:
results.append(pool.apply_async(_execute_benchmark_experiment, config))
# Stagger the process creation.
# Avoid having a peak CPU usage at the beginning because of creating
# too many processes.
# Approx. 30s is sufficient because these threads will soon become idle
# when waiting for cloud build results.
time.sleep(30)
return [result.get() for result in results]


def main():
logging.basicConfig(level=logging.INFO)
args = parse_args()
run_one_experiment.prepare()

experiment_configs = get_experiment_configs(args)
experiment_configs = _get_experiment_configs(args)
experiment_results = []

print(f'Running {NUM_EXP} experiment(s) in parallel.')
if NUM_EXP == 1:
for config in experiment_configs:
result = run_experiments(*config)
experiment_results.append(result)
_print_experiment_result(result)
experiment_results = [
_execute_benchmark_experiment(config) for config in experiment_configs
]
elif args.cloud_experiment_name:
# Use multi-threads for cloud experiments, because each thread only needs to
# wait for cloud build results or conduct simple I/O tasks.
parallelize_experiments_in_threads(experiment_configs)
else:
with Pool(NUM_EXP) as p:
for result in p.starmap(run_experiments, experiment_configs):
experiment_results.append(result)
_print_experiment_result(result)
# Use multi-process for local experiments, because each process needs to
# built fuzz targets in local docker containers.
parallelize_experiments_in_processes(experiment_configs)

_print_experiment_results(experiment_results)

Expand Down
34 changes: 26 additions & 8 deletions run_one_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import pool
from typing import List, Optional

Expand Down Expand Up @@ -174,14 +175,31 @@ def check_targets(
evaluator = exp_evaluator.Evaluator(builder_runner, benchmark, work_dirs)

ai_target_pairs = [(ai_binary, target) for target in generated_targets]
with pool.ThreadPool(NUM_EVA) as p:
for i, target_stat in enumerate(
p.starmap(evaluator.check_target, ai_target_pairs)):
if target_stat is None:
print(f'Error evaluating target {generated_targets[i]}')
continue

target_stats.append((i, target_stat))
if cloud_experiment_name:
# Use multi-threads for cloud experiments, because each thread only needs to
# wait for cloud build results or conduct simple I/O tasks.
with ThreadPoolExecutor(max_workers=NUM_EVA) as executor:
future_to_index = {
executor.submit(evaluator.check_target, *pair): i
for i, pair in enumerate(ai_target_pairs)
}
for future in as_completed(future_to_index):
i = future_to_index[future]
target_stat = future.result()
if target_stat is None:
print(f'Error evaluating target {generated_targets[i]}')
continue

target_stats.append((i, target_stat))
else:
# Use multi-process for local experiments, because each process needs to
# built fuzz targets in local docker containers.
with pool.ThreadPool(NUM_EVA) as p:
for i, target_stat in enumerate(
p.starmap(evaluator.check_target, ai_target_pairs)):
if target_stat is None:
print(f'Error evaluating target {generated_targets[i]}')
continue

if len(target_stats) > 0:
return aggregate_results(target_stats, generated_targets)
Expand Down