Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch sampling improvement #1154

Open
wants to merge 17 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions smac/main/config_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from smac.callback.callback import Callback
from smac.initial_design import AbstractInitialDesign
from smac.model.abstract_model import AbstractModel
from smac.model.gaussian_process import GaussianProcess
from smac.random_design.abstract_random_design import AbstractRandomDesign
from smac.runhistory.encoder.abstract_encoder import AbstractRunHistoryEncoder
from smac.runhistory.runhistory import RunHistory
Expand Down Expand Up @@ -44,6 +45,14 @@ class ConfigSelector:
the highest budgets are checked first. For example, if min_trials is three, but we find only
two trials in the runhistory for the highest budget, we will use trials of a lower budget
instead.
batch_sampling_estimation_strategy: str, defaults to no_estimation
Batch sample setting, this is applied for parallel setting. During batch sampling, ConfigSelectors might need
to suggest new samples while some configurations are still running. This argument determines if we want to make
use of this information and fantasize the new estimations. If no_estimate is applied, we do not use the
information from the running configurations. If the strategy is kriging_believer, we use the predicted mean from
our surrogate model as the estimations for the new samples. If the strategy is CL_min/mean/max, we use the
min/mean/max from the existing evaluations as the estimations for the new samples. if the strategy is sample,
we use our surrogate model (in this case, only GP is allowed) to sample new configurations
"""

def __init__(
Expand All @@ -53,6 +62,7 @@ def __init__(
retrain_after: int = 8,
retries: int = 16,
min_trials: int = 1,
batch_sampling_estimation_strategy: str = "no_estimate",
) -> None:
# Those are the configs sampled from the passed initial design
# Selecting configurations from initial design
Expand Down Expand Up @@ -82,6 +92,9 @@ def __init__(
# Processed configurations should be stored here; this is important to not return the same configuration twice
self._processed_configs: list[Configuration] = []

# for batch sampling setting
self._batch_sampling_estimation_strategy = batch_sampling_estimation_strategy

def _set_components(
self,
initial_design: AbstractInitialDesign,
Expand Down Expand Up @@ -284,6 +297,23 @@ def _collect_data(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
# Possible add running configs?
configs_array = self._runhistory_encoder.get_configurations(budget_subset=self._considered_budgets)

# add running configurations
# If our batch size is 1, then no running configuration should exist, we could then skip this part.
# Therefore, there is no need to check the number of workers in this case

X_running = self._runhistory_encoder.transform_running_configs(budget_subset=[b])
if self._batch_sampling_estimation_strategy != 'no_estimate':
Y_estimated = self.estimate_running_config_costs(
X_running, Y, self._batch_sampling_estimation_strategy
)
if Y_estimated is not None:
benjamc marked this conversation as resolved.
Show resolved Hide resolved
configs_array_running = self._runhistory_encoder.get_running_configurations(
budget_subset=self._considered_budgets
)
X = np.concatenate([X, X_running], axis=0)
Y = np.concatenate([Y, Y_estimated], axis=0)
configs_array = np.concatenate([configs_array, configs_array_running], axis=0)

return X, Y, configs_array

return (
Expand All @@ -300,6 +330,56 @@ def _get_evaluated_configs(self) -> list[Configuration]:
assert self._runhistory is not None
return self._runhistory.get_configs_per_budget(budget_subset=self._considered_budgets)

def estimate_running_config_costs(
self,
X_running: np.ndarray,
Y_evaluated: np.ndarray,
estimation_strategy: str = 'CL_max'):
"""
This function is implemented to estimate the still pending/ running configurations
benjamc marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
X_running : np.ndarray
a np array with size (n_running_configs, D) that represents the array values of the running configurations
Y_evaluated : np.ndarray
a np array with size (n_evaluated_configs, n_obj) that records the costs of all the previous evaluated
configurations
estimation_strategy: str
how do we estimate the target y_running values
benjamc marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
Y_running_estimated : np.ndarray
the estimated running y values
"""
n_running_points = len(X_running)
if n_running_points == 0:
return None
if estimation_strategy == 'CL_max':
# constant liar max, we take the maximal values of all the evaluated Y and apply them to the running X
Y_estimated = np.nanmax(Y_evaluated, axis=0, keepdims=True)
return np.repeat(Y_estimated, n_running_points, 0)
elif estimation_strategy == 'CL_min':
# constant liar min, we take the minimal values of all the evaluated Y and apply them to the running X
Y_estimated = np.nanmin(Y_evaluated, axis=0, keepdims=True)
return np.repeat(Y_estimated, n_running_points, 0)
elif estimation_strategy == 'CL_mean':
# constant liar min, we take the mean values of all the evaluated Y and apply them to the running X
benjamc marked this conversation as resolved.
Show resolved Hide resolved
Y_estimated = np.nanmean(Y_evaluated, axis=0, keepdims=True)
return np.repeat(Y_estimated, n_running_points, 0)
elif estimation_strategy == 'kriging_believer':
# in kriging believer, we apply the predicted means of the surrogate model to estimate the running X
return self._model.predict_marginalized(X_running)[0]
elif estimation_strategy == 'sample':
# https://papers.nips.cc/paper_files/paper/2012/file/05311655a15b75fab86956663e1819cd-Paper.pdf
# since this requires a multi-variant gaussian distribution, we need to restrict the model needs to be a
# gaussian process
assert isinstance(self._model, GaussianProcess), 'Sample based estimate strategy only allows ' \
'GP as surrogate model!'
return self._model.sample_functions(X_test=X_running, n_funcs=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cannot we sample from the random forest?

else:
raise ValueError(f'Unknown estimating strategy: {estimation_strategy}')

def _get_x_best(self, X: np.ndarray) -> tuple[np.ndarray, float]:
"""Get value, configuration, and array representation of the *best* configuration.

Expand Down
68 changes: 65 additions & 3 deletions smac/runhistory/encoder/abstract_encoder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Any, Mapping
from typing import Any, Mapping, Iterable

import numpy as np

Expand Down Expand Up @@ -188,6 +188,29 @@ def _get_considered_trials(

return trials

def _get_running_trials(
self,
budget_subset: list | None = None,
) -> dict[TrialKey, TrialValue]:
"""Returns all trials that are still running."""
if budget_subset is not None:
trials = {
trial: self.runhistory[trial]
for trial in self.runhistory
if self.runhistory[trial].status == StatusType.RUNNING
# and runhistory.data[run].time >= self._algorithm_walltime_limit # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented out / why would we need this?
If it should stay there commented, please explain why

and trial.budget in budget_subset
}
else:
trials = {
trial: self.runhistory[trial]
for trial in self.runhistory
if self.runhistory[trial].status == StatusType.RUNNING
# and runhistory.data[run].time >= self._algorithm_walltime_limit # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}

return trials

def _get_timeout_trials(
self,
budget_subset: list | None = None,
Expand All @@ -211,6 +234,13 @@ def _get_timeout_trials(

return trials

def _convert_config_ids_to_array(self,
config_ids: Iterable[int]) -> np.ndarray:
"""extract the configurations from rh and transform them into np array"""
benjamc marked this conversation as resolved.
Show resolved Hide resolved
configurations = [self.runhistory._ids_config[config_id] for config_id in config_ids]
configs_array = convert_configurations_to_array(configurations)
return configs_array

def get_configurations(
self,
budget_subset: list | None = None,
Expand All @@ -236,11 +266,31 @@ def get_configurations(
t_trials = self._get_timeout_trials(budget_subset)
t_config_ids = set(t_trial.config_id for t_trial in t_trials)
config_ids = s_config_ids | t_config_ids
configurations = [self.runhistory._ids_config[config_id] for config_id in config_ids]
configs_array = convert_configurations_to_array(configurations)
configs_array = self._convert_config_ids_to_array(config_ids)

return configs_array

def get_running_configurations(
self,
budget_subset: list | None = None,
) -> np.ndarray:
"""Returns vector representation of the configurations that are still running.

Parameters
----------
budget_subset : list | None, defaults to none
List of budgets to consider.

Returns
-------
X : np.ndarray
Configuration vector and instance features.
"""
r_trials = self._get_running_trials(budget_subset)
r_ids = set(r_trial.config_id for r_trial in r_trials)
configs_array = self._convert_config_ids_to_array(r_ids)
return configs_array

def transform(
self,
budget_subset: list | None = None,
Expand Down Expand Up @@ -282,6 +332,18 @@ def transform(
logger.debug("Converted %d observations." % (X.shape[0]))
return X, Y

def transform_running_configs(
self,
budget_subset: list | None = None,
) -> np.ndarray:
"""Return the running configurations"""
logger.debug("Transforming Running Configurations into X format...")
running_trials = self._get_running_trials(budget_subset)
# Y is not required for running configurations
X, _ = self._build_matrix(trials=running_trials, store_statistics=True)
logger.debug("Converted %d running observations." % (X.shape[0]))
return X

@abstractmethod
def transform_response_values(
self,
Expand Down
101 changes: 101 additions & 0 deletions tests/test_main/test_config_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations
import pytest

from ConfigSpace import ConfigurationSpace, Configuration, Float
import numpy as np

from smac.runhistory.dataclasses import TrialValue
from smac.acquisition.function.confidence_bound import LCB
from smac.initial_design.random_design import RandomInitialDesign
from smac import BlackBoxFacade, HyperparameterOptimizationFacade, Scenario
from smac.main.config_selector import ConfigSelector
from smac.main import config_selector


def test_estimated_config_values_are_trained_by_models(rosenbrock):
scenario = Scenario(rosenbrock.configspace, n_trials=100, n_workers=2, deterministic=True)
smac = BlackBoxFacade(
scenario,
rosenbrock.train, # We pass the target function here
overwrite=True, # Overrides any previous results that are found that are inconsistent with the meta-data
config_selector=ConfigSelector(
scenario=scenario,
retrain_after=1,
batch_sampling_estimation_strategy='no_estimate'
),
initial_design=BlackBoxFacade.get_initial_design(scenario=scenario, n_configs=5),
acquisition_function=LCB() # this ensures that we can record the number of data in the acquisition function
)
# we first initialize multiple configurations as the starting points

n_data_in_acq_func = 5
for _ in range(n_data_in_acq_func):
info = smac.ask() # we need the seed from the configuration

cost = rosenbrock.train(info.config, seed=info.seed, budget=info.budget, instance=info.instance)
value = TrialValue(cost=cost, time=0.5)

smac.tell(info, value)

# for naive approach, no point configuration values is hallucinate
all_asked_infos = []
for i in range(3):
all_asked_infos.append(smac.ask())
assert smac._acquisition_function._num_data == n_data_in_acq_func

# each time when we provide a new running configuration, we can estimate the configuration values for new
# suggestions and use this information to retrain our model. Hence, each time a new point is asked, we should
# have _num_data +1 for LCB model

n_data_in_acq_func += 3
for estimate_strategy in ['CL_max', 'CL_min', 'CL_mean', 'kriging_believer', 'sample']:
smac._config_selector._batch_sampling_estimation_strategy = estimate_strategy
for i in range(3):
all_asked_infos.append(smac.ask())
assert smac._acquisition_function._num_data == n_data_in_acq_func
n_data_in_acq_func += 1

for info in all_asked_infos:
value = TrialValue(cost=rosenbrock.train(info.config, instance=info.instance, seed=info.seed), )
smac.tell(info=info, value=value)

# now we recover to the vanilla approach, in this case, all the evaluations are exact evaluations, the number of
# data in the runhistory should not increase
_ = smac.ask()
assert smac._acquisition_function._num_data == n_data_in_acq_func


@pytest.mark.parametrize("estimation_strategy", ['CL_max', 'CL_min', 'CL_mean', 'kriging_believer', 'sample'])
def test_batch_estimation_methods(rosenbrock, estimation_strategy):
config_space = rosenbrock.configspace
scenario = Scenario(config_space, n_trials=100, n_workers=2, deterministic=True)
config_selector = ConfigSelector(
scenario=scenario,
retrain_after=1,
batch_sampling_estimation_strategy=estimation_strategy
)
model = BlackBoxFacade.get_model(scenario=scenario)
X_evaluated = config_space.sample_configuration(5)
y_train = np.asarray([rosenbrock.train(x) for x in X_evaluated])
x_train = np.asarray([x.get_array() for x in X_evaluated])

model.train(x_train, y_train)

X_running = np.asarray([x.get_array() for x in config_space.sample_configuration(3)])
config_selector._model = model

estimations = config_selector.estimate_running_config_costs(
X_running, y_train, estimation_strategy=estimation_strategy,
)
if estimation_strategy == 'CL_max':
assert (estimations == y_train.max()).all()
elif estimation_strategy == 'CL_min':
assert (estimations == y_train.min()).all()
elif estimation_strategy == 'CL_mean':
assert (estimations == y_train.mean()).all()
else:
if estimation_strategy == 'kriging_believer':
assert np.allclose(model.predict_marginalized(X_running)[0], estimations)
else:
# for sampling strategy, we simply check if the shape of the two results are the same
assert np.equal(estimations.shape, (3, 1)).all()