From d6ad505c59fc32e2a4c16acd6918521f9e7c60b7 Mon Sep 17 00:00:00 2001 From: John OHara Date: Fri, 1 Apr 2022 08:59:58 +0100 Subject: [PATCH 1/2] Refactor HpoService - Decouple REST service from optuna_hpo with a new class HpoService. Allow further apis to be introduced - Managed thread creation in hpo_service - Use Thread wait()/notify() to sync threads for providing trial data to optuna Objective function - Use Thread wait()/notify() to sync threads when a new experiment is started - Sync rest and experiment threads to ensure experiment is ready before returning from rest call - Ensure thread safe access to TrialDetails objects --- src/bayes_optuna/optuna_hpo.py | 248 +++++++++++++++++++++------------ src/hpo_service.py | 89 ++++++++++++ src/service.py | 49 ++----- 3 files changed, 263 insertions(+), 123 deletions(-) create mode 100644 src/hpo_service.py diff --git a/src/bayes_optuna/optuna_hpo.py b/src/bayes_optuna/optuna_hpo.py index 736c9f5..daa5449 100644 --- a/src/bayes_optuna/optuna_hpo.py +++ b/src/bayes_optuna/optuna_hpo.py @@ -15,9 +15,9 @@ """ import optuna +import threading import os -import time from logger import get_logger @@ -37,17 +37,151 @@ class TrialDetails: trial_number = -1 trial_json_object = {} - trial_result_received = -1 trial_result = "" result_value_type = "" result_value = 0 +class HpoExperiment: + """ + HpoExperiment contains the details of a Running experiment. + """ + thread: threading.Thread + experiment_name: str + total_trials: int + parallel_trials: int + direction: str + hpo_algo_impl: str + id_: str + objective_function: str + tunables: str + value_type: str + trialDetails = TrialDetails() + resultsAvailableCond = threading.Condition() + experimentStartedCond = threading.Condition() + started = False + + def __init__(self, experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type): + self.experiment_name = experiment_name + self.total_trials = total_trials + self.parallel_trials = parallel_trials + self.direction = direction + self.hpo_algo_impl = hpo_algo_impl + self.id_ = id_ + self.objective_function = objective_function + self.tunables = tunables + self.value_type = value_type + + self.thread = threading.Thread(target=self.recommend) + + def start(self) -> threading.Condition: + try: + self.experimentStartedCond.acquire() + self.thread.start() + finally: + self.experimentStartedCond.release() + return self.experimentStartedCond + + def hasStarted(self) -> bool: + started = False + try: + self.experimentStartedCond.acquire() + started = self.started + finally: + self.experimentStartedCond.release() + return started + + def notifyStarted(self): + #notify hpo_service.startExperiment() that experiment is ready to accept results + if not self.hasStarted(): + try: + self.experimentStartedCond.acquire() + self.started = True + self.experimentStartedCond.notify() + finally: + self.experimentStartedCond.release() + + + def perform_experiment(self): + self.resultsAvailableCond.acquire() + self.resultsAvailableCond.wait() + result_value = self.trialDetails.result_value + trial_result = self.trialDetails.trial_result + self.resultsAvailableCond.release() + return result_value, trial_result + + def recommend(self): + """ + Perform Bayesian Optimization with Optuna using the appropriate sampler and recommend the best config. + + Parameters: + experiment_name (str): The name of the application that is being optimized. + direction (str): Direction of optimization, minimize or maximize. + hpo_algo_impl (str): Hyperparameter optimization library to perform Bayesian Optimization. + id_ (str): The id of the application that is being optimized. + objective_function (str): The objective function that is being optimized. + tunables (list): A list containing the details of each tunable in a dictionary format. + value_type (string): Value type of the objective function. + """ + # Set the logging level for the Optuna’s root logger + optuna.logging.set_verbosity(optuna.logging.WARNING) + # Propagate all of Optuna log outputs to the root logger + optuna.logging.enable_propagation() + # Disable the default handler of the Optuna’s root logger + optuna.logging.disable_default_handler() + + # Choose a sampler based on the value of ml_algo_impl + if self.hpo_algo_impl == "optuna_tpe": + sampler = optuna.samplers.TPESampler() + elif self.hpo_algo_impl == "optuna_tpe_multivariate": + sampler = optuna.samplers.TPESampler(multivariate=True) + elif self.hpo_algo_impl == "optuna_skopt": + sampler = optuna.integration.SkoptSampler() + + # Create a study object + study = optuna.create_study(direction=self.direction, sampler=sampler) + + # Execute an optimization by using an 'Objective' instance + study.optimize(Objective(self), n_trials=self.total_trials, n_jobs=self.parallel_trials) + + self.trialDetails.trial_number = -1 + + # Get the best parameter + logger.info("Best parameter: " + str(study.best_params)) + # Get the best value + logger.info("Best value: " + str(study.best_value)) + # Get the best trial + logger.info("Best trial: " + str(study.best_trial)) + + logger.debug("All trials: " + str(trials)) + + # recommended_config (json): A JSON containing the recommended config. + recommended_config = {} + + optimal_value = {"objective_function": { + "name": self.objective_function, + "value": study.best_value, + "value_type": self.value_type + }, "tunables": []} + + for tunable in self.tunables: + for key, value in study.best_params.items(): + if key == tunable["name"]: + tunable_value = value + optimal_value["tunables"].append( + { + "name": tunable["name"], + "value": tunable_value, + "value_type": tunable["value_type"] + } + ) + + recommended_config["id"] = self.id_ + recommended_config["experiment_name"] = self.experiment_name + recommended_config["direction"] = self.direction + recommended_config["optimal_value"] = optimal_value + + logger.info("Recommended config: " + str(recommended_config)) -def perform_experiment(): - # Loop to be replaced by a queueing mechanism to determine if the result has been received - while TrialDetails.trial_result_received == -1: - time.sleep(1) - return TrialDetails.result_value, TrialDetails.trial_result class Objective(TrialDetails): @@ -58,8 +192,9 @@ class Objective(TrialDetails): tunables (list): A list containing the details of each tunable in a dictionary format. """ - def __init__(self, tunables): - self.tunables = tunables + def __init__(self, experiment: HpoExperiment): + self.experiment: HpoExperiment = experiment + self.tunables = experiment.tunables def __call__(self, trial): global trials @@ -67,7 +202,16 @@ def __call__(self, trial): experiment_tunables = [] config = {} - TrialDetails.trial_number += 1 + try: + self.experiment.resultsAvailableCond.acquire() + self.experiment.trialDetails.trial_number += 1 + self.experiment.trialDetails.trial_json_object = {} + self.experiment.trialDetails.trial_result = "" + self.experiment.trialDetails.result_value_type = "" + self.experiment.trialDetails.result_value = 0 + finally: + self.experiment.resultsAvailableCond.release() + # Define search space for tunable in self.tunables: @@ -88,11 +232,15 @@ def __call__(self, trial): logger.debug("Experiment tunables: " + str(experiment_tunables)) - TrialDetails.trial_json_object = experiment_tunables + try: + self.experiment.resultsAvailableCond.acquire() + self.experiment.trialDetails.trial_json_object = experiment_tunables + finally: + self.experiment.resultsAvailableCond.release() - actual_slo_value, experiment_status = perform_experiment() + self.experiment.notifyStarted() - TrialDetails.trial_result_received = -1 + actual_slo_value, experiment_status = self.experiment.perform_experiment() config["experiment_status"] = experiment_status @@ -103,77 +251,3 @@ def __call__(self, trial): actual_slo_value = round(float(actual_slo_value), 2) return actual_slo_value - - -def recommend(experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type): - """ - Perform Bayesian Optimization with Optuna using the appropriate sampler and recommend the best config. - - Parameters: - experiment_name (str): The name of the application that is being optimized. - direction (str): Direction of optimization, minimize or maximize. - hpo_algo_impl (str): Hyperparameter optimization library to perform Bayesian Optimization. - id_ (str): The id of the application that is being optimized. - objective_function (str): The objective function that is being optimized. - tunables (list): A list containing the details of each tunable in a dictionary format. - value_type (string): Value type of the objective function. - """ - # Set the logging level for the Optuna’s root logger - optuna.logging.set_verbosity(optuna.logging.WARNING) - # Propagate all of Optuna log outputs to the root logger - optuna.logging.enable_propagation() - # Disable the default handler of the Optuna’s root logger - optuna.logging.disable_default_handler() - - # Choose a sampler based on the value of ml_algo_impl - if hpo_algo_impl == "optuna_tpe": - sampler = optuna.samplers.TPESampler() - elif hpo_algo_impl == "optuna_tpe_multivariate": - sampler = optuna.samplers.TPESampler(multivariate=True) - elif hpo_algo_impl == "optuna_skopt": - sampler = optuna.integration.SkoptSampler() - - # Create a study object - study = optuna.create_study(direction=direction, sampler=sampler) - - # Execute an optimization by using an 'Objective' instance - study.optimize(Objective(tunables), n_trials=total_trials, n_jobs=parallel_trials) - - TrialDetails.trial_number = -1 - - # Get the best parameter - logger.info("Best parameter: " + str(study.best_params)) - # Get the best value - logger.info("Best value: " + str(study.best_value)) - # Get the best trial - logger.info("Best trial: " + str(study.best_trial)) - - logger.debug("All trials: " + str(trials)) - - # recommended_config (json): A JSON containing the recommended config. - recommended_config = {} - - optimal_value = {"objective_function": { - "name": objective_function, - "value": study.best_value, - "value_type": value_type - }, "tunables": []} - - for tunable in tunables: - for key, value in study.best_params.items(): - if key == tunable["name"]: - tunable_value = value - optimal_value["tunables"].append( - { - "name": tunable["name"], - "value": tunable_value, - "value_type": tunable["value_type"] - } - ) - - recommended_config["id"] = id_ - recommended_config["experiment_name"] = experiment_name - recommended_config["direction"] = direction - recommended_config["optimal_value"] = optimal_value - - logger.info("Recommended config: " + str(recommended_config)) diff --git a/src/hpo_service.py b/src/hpo_service.py new file mode 100644 index 0000000..7d42b1c --- /dev/null +++ b/src/hpo_service.py @@ -0,0 +1,89 @@ +import threading +import json +from bayes_optuna import optuna_hpo +from typing import TypedDict + +class HpoService: + """ + HpoService manages all running experiments, including starting experiments, updating trial results and returning optimized configurations + """ + + def __init__(self): + self.experiments = {} + + def newExperiment(self, id_, experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, objective_function, tunables, value_type): + if self.containsExperiment(id_): + print("Experiment already exists") + return + + self.experiments[id_] = optuna_hpo.HpoExperiment(experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type) + + + def startExperiment(self, id_): + experiment: optuna_hpo.HpoExperiment = self.experiments.get(id_) + started: threading.Condition = experiment.start(); + try: + started.acquire() + value = started.wait(10) #wait with timeout of 10s + finally: + started.release() + + if not value: + print("Starting experiment timed out!") + + def containsExperiment(self, id_): + if self.experiments is None or not self.experiments : + return False + return id_ in self.experiments.keys() + + def doesNotContainExperiment(self, id_): + return not self.containsExperiment(id_) + + def getExperiment(self, id_) -> optuna_hpo.HpoExperiment: + if self.doesNotContainExperiment(id_): + print("Experiment does not exist") + return + + return self.experiments.get(id_) + + + def get_trial_number(self, id_): + + experiment: optuna_hpo.HpoExperiment = self.getExperiment(id_) + """Return the trial number.""" + if experiment.hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): + try: + experiment.resultsAvailableCond.acquire() + trial_number = experiment.trialDetails.trial_number + finally: + experiment.resultsAvailableCond.release() + return trial_number + + + def get_trial_json_object(self, id_): + experiment: optuna_hpo.HpoExperiment = self.getExperiment(id_) + """Return the trial json object.""" + if experiment.hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): + try: + experiment.resultsAvailableCond.acquire() + trial_json_object = json.dumps(experiment.trialDetails.trial_json_object) + finally: + experiment.resultsAvailableCond.release() + return trial_json_object + + + def set_result(self, id_, trial_result, result_value_type, result_value): + experiment: optuna_hpo.HpoExperiment = self.getExperiment(id_) + """Set the details of a trial.""" + if experiment.hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): + try: + experiment.resultsAvailableCond.acquire() + experiment.trialDetails.trial_result = trial_result + experiment.trialDetails.result_value_type = result_value_type + experiment.trialDetails.result_value = result_value + experiment.resultsAvailableCond.notify() + finally: + experiment.resultsAvailableCond.release() + + +instance: HpoService = HpoService(); \ No newline at end of file diff --git a/src/service.py b/src/service.py index ced0811..8e553f7 100644 --- a/src/service.py +++ b/src/service.py @@ -19,10 +19,10 @@ import cgi import json import requests -import threading import time from urllib.parse import urlparse, parse_qs +import hpo_service from json_validate import validate_trial_generate_json from tunables import get_all_tunables @@ -94,9 +94,9 @@ def do_GET(self): if re.search(api_endpoint, self.path): query = parse_qs(urlparse(self.path).query) - if ("experiment_id" in query and "trial_number" in query and query["experiment_id"][0] in autotune_object_ids.keys() and - query["trial_number"][0] == str(get_trial_number(query["experiment_id"][0]))): - data = get_trial_json_object(query["experiment_id"][0]) + if ("experiment_id" in query and "trial_number" in query and hpo_service.instance.containsExperiment(query["experiment_id"][0]) and + query["trial_number"][0] == str(hpo_service.instance.get_trial_number(query["experiment_id"][0]))): + data = hpo_service.instance.get_trial_json_object(query["experiment_id"][0]) self._set_response(200, data) else: self._set_response(404, "-1") @@ -110,7 +110,7 @@ def handle_generate_new_operation(self, json_object): search_space_json = json_object["search_space"] if is_valid_json_object and json_object["search_space"]["experiment_id"] not in autotune_object_ids.keys(): get_search_create_study(search_space_json, json_object["operation"]) - trial_number = get_trial_number(json_object["search_space"]["experiment_id"]) + trial_number = hpo_service.instance.get_trial_number(json_object["search_space"]["experiment_id"]) self._set_response(200, str(trial_number)) else: self._set_response(400, "-1") @@ -121,16 +121,16 @@ def handle_generate_subsequent_operation(self, json_object): if is_valid_json_object and json_object["experiment_id"] in autotune_object_ids.keys(): get_search_create_study(search_space_json, json_object["operation"]) - trial_number = get_trial_number(json_object["experiment_id"]) + trial_number = hpo_service.instance.get_trial_number(json_object["experiment_id"]) self._set_response(200, str(trial_number)) else: self._set_response(400, "-1") def handle_result_operation(self, json_object): """Process EXP_TRIAL_RESULT operation.""" - if (json_object["experiment_id"] in autotune_object_ids.keys() and - json_object["trial_number"] == get_trial_number(json_object["experiment_id"])): - set_result(json_object["experiment_id"], json_object["trial_result"], json_object["result_value_type"], + if (hpo_service.instance.containsExperiment(json_object["experiment_id"]) and + json_object["trial_number"] == hpo_service.instance.get_trial_number(json_object["experiment_id"])): + hpo_service.instance.set_result(json_object["experiment_id"], json_object["trial_result"], json_object["result_value_type"], json_object["result_value"]) self._set_response(200, "0") else: @@ -143,12 +143,11 @@ def get_search_create_study(search_space_json, operation): if operation == "EXP_TRIAL_GENERATE_NEW": experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type = get_all_tunables( search_space_json) - autotune_object_ids[id_] = hpo_algo_impl if hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): - threading.Thread( - target=optuna_hpo.recommend, args=(experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, - tunables, value_type)).start() - time.sleep(2) + hpo_service.instance.newExperiment(id_, experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, objective_function, + tunables, value_type) + print("Starting Experiment: " + experiment_name) + hpo_service.instance.startExperiment(experiment_name) def get_search_space(id_, url): @@ -160,28 +159,6 @@ def get_search_space(id_, url): return search_space_json -def get_trial_number(id_): - """Return the trial number.""" - if autotune_object_ids[id_] in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): - trial_number = optuna_hpo.TrialDetails.trial_number - return trial_number - - -def get_trial_json_object(id_): - """Return the trial json object.""" - if autotune_object_ids[id_] in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): - trial_json_object = json.dumps(optuna_hpo.TrialDetails.trial_json_object) - return trial_json_object - - -def set_result(id_, trial_result, result_value_type, result_value): - """Set the details of a trial.""" - if autotune_object_ids[id_] in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"): - optuna_hpo.TrialDetails.trial_result = trial_result - optuna_hpo.TrialDetails.result_value_type = result_value_type - optuna_hpo.TrialDetails.result_value = result_value - optuna_hpo.TrialDetails.trial_result_received = 1 - def main(): host_name = get_Host_name_IP() From 46a523e8d8a81ea4c879229f873db2833d8819a9 Mon Sep 17 00:00:00 2001 From: John OHara Date: Thu, 7 Apr 2022 11:34:02 +0100 Subject: [PATCH 2/2] Cleanup after rebase --- src/hpo_service.py | 2 +- src/service.py | 20 +++++++------------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/hpo_service.py b/src/hpo_service.py index 7d42b1c..9453865 100644 --- a/src/hpo_service.py +++ b/src/hpo_service.py @@ -21,7 +21,7 @@ def newExperiment(self, id_, experiment_name, total_trials, parallel_trials, dir def startExperiment(self, id_): experiment: optuna_hpo.HpoExperiment = self.experiments.get(id_) - started: threading.Condition = experiment.start(); + started: threading.Condition = experiment.start() try: started.acquire() value = started.wait(10) #wait with timeout of 10s diff --git a/src/service.py b/src/service.py index 8e553f7..1dfddef 100644 --- a/src/service.py +++ b/src/service.py @@ -22,7 +22,6 @@ import time from urllib.parse import urlparse, parse_qs -import hpo_service from json_validate import validate_trial_generate_json from tunables import get_all_tunables @@ -42,11 +41,7 @@ print("No. of Trials = ",n_trials) print("No. of Jobs = ",n_jobs) -from bayes_optuna import optuna_hpo - - -autotune_object_ids = {} -search_space_json = [] +import hpo_service api_endpoint = "/experiment_trials" server_port = 8085 @@ -107,8 +102,8 @@ def handle_generate_new_operation(self, json_object): """Process EXP_TRIAL_GENERATE_NEW operation.""" is_valid_json_object = validate_trial_generate_json(json_object) - search_space_json = json_object["search_space"] - if is_valid_json_object and json_object["search_space"]["experiment_id"] not in autotune_object_ids.keys(): + if is_valid_json_object and hpo_service.instance.doesNotContainExperiment(json_object["search_space"]["experiment_id"]): + search_space_json = json_object["search_space"] get_search_create_study(search_space_json, json_object["operation"]) trial_number = hpo_service.instance.get_trial_number(json_object["search_space"]["experiment_id"]) self._set_response(200, str(trial_number)) @@ -118,10 +113,9 @@ def handle_generate_new_operation(self, json_object): def handle_generate_subsequent_operation(self, json_object): """Process EXP_TRIAL_GENERATE_SUBSEQUENT operation.""" is_valid_json_object = validate_trial_generate_json(json_object) - - if is_valid_json_object and json_object["experiment_id"] in autotune_object_ids.keys(): - get_search_create_study(search_space_json, json_object["operation"]) - trial_number = hpo_service.instance.get_trial_number(json_object["experiment_id"]) + experiment_id = json_object["experiment_id"] + if is_valid_json_object and hpo_service.instance.containsExperiment(experiment_id): + trial_number = hpo_service.instance.get_trial_number(experiment_id) self._set_response(200, str(trial_number)) else: self._set_response(400, "-1") @@ -147,7 +141,7 @@ def get_search_create_study(search_space_json, operation): hpo_service.instance.newExperiment(id_, experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, objective_function, tunables, value_type) print("Starting Experiment: " + experiment_name) - hpo_service.instance.startExperiment(experiment_name) + hpo_service.instance.startExperiment(id_) def get_search_space(id_, url):