Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor HpoService #9

Merged
merged 2 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 161 additions & 87 deletions src/bayes_optuna/optuna_hpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
"""

import optuna
import threading

import os
import time

from logger import get_logger

Expand All @@ -37,17 +37,151 @@ class TrialDetails:

trial_number = -1
trial_json_object = {}
trial_result_received = -1
trial_result = ""
result_value_type = ""
result_value = 0

class HpoExperiment:
"""
HpoExperiment contains the details of a Running experiment.
"""
thread: threading.Thread
experiment_name: str
total_trials: int
parallel_trials: int
direction: str
hpo_algo_impl: str
id_: str
objective_function: str
tunables: str
value_type: str
trialDetails = TrialDetails()
resultsAvailableCond = threading.Condition()
experimentStartedCond = threading.Condition()
started = False

def __init__(self, experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type):
self.experiment_name = experiment_name
self.total_trials = total_trials
self.parallel_trials = parallel_trials
self.direction = direction
self.hpo_algo_impl = hpo_algo_impl
self.id_ = id_
self.objective_function = objective_function
self.tunables = tunables
self.value_type = value_type

self.thread = threading.Thread(target=self.recommend)

def start(self) -> threading.Condition:
try:
self.experimentStartedCond.acquire()
self.thread.start()
finally:
self.experimentStartedCond.release()
return self.experimentStartedCond

def hasStarted(self) -> bool:
started = False
try:
self.experimentStartedCond.acquire()
started = self.started
finally:
self.experimentStartedCond.release()
return started

def notifyStarted(self):
#notify hpo_service.startExperiment() that experiment is ready to accept results
if not self.hasStarted():
try:
self.experimentStartedCond.acquire()
self.started = True
self.experimentStartedCond.notify()
finally:
self.experimentStartedCond.release()


def perform_experiment(self):
self.resultsAvailableCond.acquire()
self.resultsAvailableCond.wait()
result_value = self.trialDetails.result_value
trial_result = self.trialDetails.trial_result
self.resultsAvailableCond.release()
return result_value, trial_result

def recommend(self):
"""
Perform Bayesian Optimization with Optuna using the appropriate sampler and recommend the best config.

Parameters:
experiment_name (str): The name of the application that is being optimized.
direction (str): Direction of optimization, minimize or maximize.
hpo_algo_impl (str): Hyperparameter optimization library to perform Bayesian Optimization.
id_ (str): The id of the application that is being optimized.
objective_function (str): The objective function that is being optimized.
tunables (list): A list containing the details of each tunable in a dictionary format.
value_type (string): Value type of the objective function.
"""
# Set the logging level for the Optuna’s root logger
optuna.logging.set_verbosity(optuna.logging.WARNING)
# Propagate all of Optuna log outputs to the root logger
optuna.logging.enable_propagation()
# Disable the default handler of the Optuna’s root logger
optuna.logging.disable_default_handler()

# Choose a sampler based on the value of ml_algo_impl
if self.hpo_algo_impl == "optuna_tpe":
sampler = optuna.samplers.TPESampler()
elif self.hpo_algo_impl == "optuna_tpe_multivariate":
sampler = optuna.samplers.TPESampler(multivariate=True)
elif self.hpo_algo_impl == "optuna_skopt":
sampler = optuna.integration.SkoptSampler()

# Create a study object
study = optuna.create_study(direction=self.direction, sampler=sampler)

# Execute an optimization by using an 'Objective' instance
study.optimize(Objective(self), n_trials=self.total_trials, n_jobs=self.parallel_trials)

self.trialDetails.trial_number = -1

# Get the best parameter
logger.info("Best parameter: " + str(study.best_params))
# Get the best value
logger.info("Best value: " + str(study.best_value))
# Get the best trial
logger.info("Best trial: " + str(study.best_trial))

logger.debug("All trials: " + str(trials))

# recommended_config (json): A JSON containing the recommended config.
recommended_config = {}

optimal_value = {"objective_function": {
"name": self.objective_function,
"value": study.best_value,
"value_type": self.value_type
}, "tunables": []}

for tunable in self.tunables:
for key, value in study.best_params.items():
if key == tunable["name"]:
tunable_value = value
optimal_value["tunables"].append(
{
"name": tunable["name"],
"value": tunable_value,
"value_type": tunable["value_type"]
}
)

recommended_config["id"] = self.id_
recommended_config["experiment_name"] = self.experiment_name
recommended_config["direction"] = self.direction
recommended_config["optimal_value"] = optimal_value

logger.info("Recommended config: " + str(recommended_config))

def perform_experiment():
# Loop to be replaced by a queueing mechanism to determine if the result has been received
while TrialDetails.trial_result_received == -1:
time.sleep(1)
return TrialDetails.result_value, TrialDetails.trial_result


class Objective(TrialDetails):
Expand All @@ -58,16 +192,26 @@ class Objective(TrialDetails):
tunables (list): A list containing the details of each tunable in a dictionary format.
"""

def __init__(self, tunables):
self.tunables = tunables
def __init__(self, experiment: HpoExperiment):
self.experiment: HpoExperiment = experiment
self.tunables = experiment.tunables

def __call__(self, trial):
global trials

experiment_tunables = []
config = {}

TrialDetails.trial_number += 1
try:
self.experiment.resultsAvailableCond.acquire()
self.experiment.trialDetails.trial_number += 1
self.experiment.trialDetails.trial_json_object = {}
self.experiment.trialDetails.trial_result = ""
self.experiment.trialDetails.result_value_type = ""
self.experiment.trialDetails.result_value = 0
finally:
self.experiment.resultsAvailableCond.release()


# Define search space
for tunable in self.tunables:
Expand All @@ -88,11 +232,15 @@ def __call__(self, trial):

logger.debug("Experiment tunables: " + str(experiment_tunables))

TrialDetails.trial_json_object = experiment_tunables
try:
self.experiment.resultsAvailableCond.acquire()
self.experiment.trialDetails.trial_json_object = experiment_tunables
finally:
self.experiment.resultsAvailableCond.release()

actual_slo_value, experiment_status = perform_experiment()
self.experiment.notifyStarted()

TrialDetails.trial_result_received = -1
actual_slo_value, experiment_status = self.experiment.perform_experiment()

config["experiment_status"] = experiment_status

Expand All @@ -103,77 +251,3 @@ def __call__(self, trial):

actual_slo_value = round(float(actual_slo_value), 2)
return actual_slo_value


def recommend(experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type):
"""
Perform Bayesian Optimization with Optuna using the appropriate sampler and recommend the best config.

Parameters:
experiment_name (str): The name of the application that is being optimized.
direction (str): Direction of optimization, minimize or maximize.
hpo_algo_impl (str): Hyperparameter optimization library to perform Bayesian Optimization.
id_ (str): The id of the application that is being optimized.
objective_function (str): The objective function that is being optimized.
tunables (list): A list containing the details of each tunable in a dictionary format.
value_type (string): Value type of the objective function.
"""
# Set the logging level for the Optuna’s root logger
optuna.logging.set_verbosity(optuna.logging.WARNING)
# Propagate all of Optuna log outputs to the root logger
optuna.logging.enable_propagation()
# Disable the default handler of the Optuna’s root logger
optuna.logging.disable_default_handler()

# Choose a sampler based on the value of ml_algo_impl
if hpo_algo_impl == "optuna_tpe":
sampler = optuna.samplers.TPESampler()
elif hpo_algo_impl == "optuna_tpe_multivariate":
sampler = optuna.samplers.TPESampler(multivariate=True)
elif hpo_algo_impl == "optuna_skopt":
sampler = optuna.integration.SkoptSampler()

# Create a study object
study = optuna.create_study(direction=direction, sampler=sampler)

# Execute an optimization by using an 'Objective' instance
study.optimize(Objective(tunables), n_trials=total_trials, n_jobs=parallel_trials)

TrialDetails.trial_number = -1

# Get the best parameter
logger.info("Best parameter: " + str(study.best_params))
# Get the best value
logger.info("Best value: " + str(study.best_value))
# Get the best trial
logger.info("Best trial: " + str(study.best_trial))

logger.debug("All trials: " + str(trials))

# recommended_config (json): A JSON containing the recommended config.
recommended_config = {}

optimal_value = {"objective_function": {
"name": objective_function,
"value": study.best_value,
"value_type": value_type
}, "tunables": []}

for tunable in tunables:
for key, value in study.best_params.items():
if key == tunable["name"]:
tunable_value = value
optimal_value["tunables"].append(
{
"name": tunable["name"],
"value": tunable_value,
"value_type": tunable["value_type"]
}
)

recommended_config["id"] = id_
recommended_config["experiment_name"] = experiment_name
recommended_config["direction"] = direction
recommended_config["optimal_value"] = optimal_value

logger.info("Recommended config: " + str(recommended_config))
89 changes: 89 additions & 0 deletions src/hpo_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import threading
import json
from bayes_optuna import optuna_hpo
from typing import TypedDict

class HpoService:
"""
HpoService manages all running experiments, including starting experiments, updating trial results and returning optimized configurations
"""

def __init__(self):
self.experiments = {}

def newExperiment(self, id_, experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, objective_function, tunables, value_type):
if self.containsExperiment(id_):
print("Experiment already exists")
return

self.experiments[id_] = optuna_hpo.HpoExperiment(experiment_name, total_trials, parallel_trials, direction, hpo_algo_impl, id_, objective_function, tunables, value_type)


def startExperiment(self, id_):
experiment: optuna_hpo.HpoExperiment = self.experiments.get(id_)
started: threading.Condition = experiment.start()
try:
started.acquire()
value = started.wait(10) #wait with timeout of 10s
finally:
started.release()

if not value:
print("Starting experiment timed out!")

def containsExperiment(self, id_):
if self.experiments is None or not self.experiments :
return False
return id_ in self.experiments.keys()

def doesNotContainExperiment(self, id_):
return not self.containsExperiment(id_)

def getExperiment(self, id_) -> optuna_hpo.HpoExperiment:
if self.doesNotContainExperiment(id_):
print("Experiment does not exist")
return

return self.experiments.get(id_)


def get_trial_number(self, id_):

experiment: optuna_hpo.HpoExperiment = self.getExperiment(id_)
"""Return the trial number."""
if experiment.hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"):
try:
experiment.resultsAvailableCond.acquire()
trial_number = experiment.trialDetails.trial_number
finally:
experiment.resultsAvailableCond.release()
return trial_number


def get_trial_json_object(self, id_):
experiment: optuna_hpo.HpoExperiment = self.getExperiment(id_)
"""Return the trial json object."""
if experiment.hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"):
try:
experiment.resultsAvailableCond.acquire()
trial_json_object = json.dumps(experiment.trialDetails.trial_json_object)
finally:
experiment.resultsAvailableCond.release()
return trial_json_object


def set_result(self, id_, trial_result, result_value_type, result_value):
experiment: optuna_hpo.HpoExperiment = self.getExperiment(id_)
"""Set the details of a trial."""
if experiment.hpo_algo_impl in ("optuna_tpe", "optuna_tpe_multivariate", "optuna_skopt"):
try:
experiment.resultsAvailableCond.acquire()
experiment.trialDetails.trial_result = trial_result
experiment.trialDetails.result_value_type = result_value_type
experiment.trialDetails.result_value = result_value
experiment.resultsAvailableCond.notify()
finally:
experiment.resultsAvailableCond.release()


instance: HpoService = HpoService();
Loading