#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from bigdl.orca.automl.search import SearchEngineFactory
from bigdl.dllib.utils.log4Error import *
from bigdl.dllib.utils.log4Error import invalidInputError
from numpy import ndarray
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union, Any
if TYPE_CHECKING:
from bigdl.orca.automl.model.base_pytorch_model import ModelBuilder
from pyspark.sql import DataFrame
from ray.tune.sample import Categorical, Float, Integer, Function
[docs]class AutoEstimator:
"""
Example:
>>> auto_est = AutoEstimator.from_torch(model_creator=model_creator,
optimizer=get_optimizer,
loss=nn.BCELoss(),
logs_dir="/tmp/zoo_automl_logs",
resources_per_trial={"cpu": 2},
name="test_fit")
>>> auto_est.fit(data=data,
validation_data=validation_data,
search_space=create_linear_search_space(),
n_sampling=4,
epochs=1,
metric="accuracy")
>>> best_model = auto_est.get_best_model()
"""
def __init__(self,
model_builder: "ModelBuilder",
logs_dir: str="/tmp/auto_estimator_logs",
resources_per_trial: Optional[Dict[str, int]]=None,
remote_dir: Optional[str]=None,
name: Optional[str]=None) -> None:
self.model_builder = model_builder
self.searcher = SearchEngineFactory.create_engine(
backend="ray",
logs_dir=logs_dir,
resources_per_trial=resources_per_trial,
remote_dir=remote_dir,
name=name)
self._fitted = False
self.best_trial = None
[docs] @staticmethod
def from_torch(*,
model_creator: Callable,
optimizer: Callable,
loss: Callable,
logs_dir: str="/tmp/auto_estimator_logs",
resources_per_trial: Optional[Dict[str, int]]=None,
name: str="auto_pytorch_estimator",
remote_dir: Optional[str]=None,
) -> "AutoEstimator":
"""
Create an AutoEstimator for torch.
:param model_creator: PyTorch model creator function.
:param optimizer: PyTorch optimizer creator function or pytorch optimizer name (string).
Note that you should specify learning rate search space with key as "lr" or LR_NAME
(from bigdl.orca.automl.pytorch_utils import LR_NAME) if input optimizer name.
Without learning rate search space specified, the default learning rate value of 1e-3
will be used for all estimators.
:param loss: PyTorch loss instance or PyTorch loss creator function
or pytorch loss name (string).
:param logs_dir: Local directory to save logs and results. It defaults to
"/tmp/auto_estimator_logs"
:param resources_per_trial: Dict. resources for each trial. e.g. {"cpu": 2}.
:param name: Name of the auto estimator. It defaults to "auto_pytorch_estimator"
:param remote_dir: String. Remote directory to sync training results and checkpoints. It
defaults to None and doesn't take effects while running in local. While running in
cluster, it defaults to "hdfs:///tmp/{name}".
:return: an AutoEstimator object.
"""
from bigdl.orca.automl.model.base_pytorch_model import PytorchModelBuilder
model_builder = PytorchModelBuilder(model_creator=model_creator,
optimizer_creator=optimizer,
loss_creator=loss)
return AutoEstimator(model_builder=model_builder,
logs_dir=logs_dir,
resources_per_trial=resources_per_trial,
remote_dir=remote_dir,
name=name)
[docs] @staticmethod
def from_keras(*,
model_creator: Callable,
logs_dir: str="/tmp/auto_estimator_logs",
resources_per_trial: Optional[Dict[str, int]]=None,
name: str="auto_keras_estimator",
remote_dir: Optional[str]=None,
) -> "AutoEstimator":
"""
Create an AutoEstimator for tensorflow keras.
:param model_creator: Tensorflow keras model creator function.
:param logs_dir: Local directory to save logs and results. It defaults to
"/tmp/auto_estimator_logs"
:param resources_per_trial: Dict. resources for each trial. e.g. {"cpu": 2}.
:param name: Name of the auto estimator. It defaults to "auto_keras_estimator"
:param remote_dir: String. Remote directory to sync training results and checkpoints. It
defaults to None and doesn't take effects while running in local. While running in
cluster, it defaults to "hdfs:///tmp/{name}".
:return: an AutoEstimator object.
"""
from bigdl.orca.automl.model.base_keras_model import KerasModelBuilder
model_builder = KerasModelBuilder(model_creator=model_creator)
return AutoEstimator(model_builder=model_builder,
logs_dir=logs_dir,
resources_per_trial=resources_per_trial,
remote_dir=remote_dir,
name=name)
[docs] def fit(self,
data: Union[Callable, Tuple["ndarray", "ndarray"], "DataFrame"],
epochs: int=1,
validation_data: Optional[
Union[Callable, Tuple["ndarray", "ndarray"], "DataFrame"]]=None,
metric: Optional[Union[Callable, str]]=None,
metric_mode: Optional[str]=None,
metric_threshold: Optional[Union["Function", "float", "int"]]=None,
n_sampling: int=1,
search_space: Optional[Dict]=None,
search_alg: Optional[str]=None,
search_alg_params: Optional[Dict]=None,
scheduler: Optional[str]=None,
scheduler_params: Optional[Dict]=None,
feature_cols: Optional[List[str]]=None,
label_cols: Optional[List[str]]=None,
) -> None:
"""
Automatically fit the model and search for the best hyperparameters.
:param data: train data.
If the AutoEstimator is created with from_torch, data can be a tuple of
ndarrays or a PyTorch DataLoader or a function that takes a config dictionary as
parameter and returns a PyTorch DataLoader.
If the AutoEstimator is created with from_keras, data can be a tuple of
ndarrays or a function that takes a config dictionary as
parameter and returns a Tensorflow Dataset.
If data is a tuple of ndarrays, it should be in the form of (x, y),
where x is training input data and y is training target data.
:param epochs: Max number of epochs to train in each trial. Defaults to 1.
If you have also set metric_threshold, a trial will stop if either it has been
optimized to the metric_threshold or it has been trained for {epochs} epochs.
:param validation_data: Validation data. Validation data type should be the same as data.
:param metric: String or customized evaluation metric function.
If string, metric is the evaluation metric name to optimize, e.g. "mse".
If callable function, it signature should be func(y_true, y_pred), where y_true and
y_pred are numpy ndarray. The function should return a float value as evaluation result.
:param metric_mode: One of ["min", "max"]. "max" means greater metric value is better.
You have to specify metric_mode if you use a customized metric function.
You don't have to specify metric_mode if you use the built-in metric in
bigdl.orca.automl.metrics.Evaluator.
:param metric_threshold: a trial will be terminated when metric threshold is met
:param n_sampling: Number of times to sample from the search_space. Defaults to 1.
If hp.grid_search is in search_space, the grid will be repeated n_sampling of times.
If this is -1, (virtually) infinite samples are generated
until a stopping condition is met.
:param search_space: a dict for search space
:param search_alg: str, all supported searcher provided by ray tune
(i.e."variant_generator", "random", "ax", "dragonfly", "skopt",
"hyperopt", "bayesopt", "bohb", "nevergrad", "optuna", "zoopt" and
"sigopt")
:param search_alg_params: extra parameters for searcher algorithm besides search_space,
metric and searcher mode
:param scheduler: str, all supported scheduler provided by ray tune
:param scheduler_params: parameters for scheduler
:param feature_cols: feature column names if data is Spark DataFrame.
:param label_cols: target column names if data is Spark DataFrame.
"""
if self._fitted:
invalidInputError(False,
"This AutoEstimator has already been fitted and cannot fit again.")
metric_mode = AutoEstimator._validate_metric_mode(metric, metric_mode)
feature_cols, label_cols = AutoEstimator._check_spark_dataframe_input(data,
validation_data,
feature_cols,
label_cols)
self.searcher.compile(data=data,
model_builder=self.model_builder,
epochs=epochs,
validation_data=validation_data,
metric=metric,
metric_mode=metric_mode,
metric_threshold=metric_threshold,
n_sampling=n_sampling,
search_space=search_space,
search_alg=search_alg,
search_alg_params=search_alg_params,
scheduler=scheduler,
scheduler_params=scheduler_params,
feature_cols=feature_cols,
label_cols=label_cols)
self.searcher.run()
self._fitted = True
[docs] def get_best_model(self):
"""
Return the best model found by the AutoEstimator
:return: the best model instance
"""
if not self.best_trial:
self.best_trial = self.searcher.get_best_trial()
best_model_path = self.best_trial.model_path
best_config = self.best_trial.config
best_automl_model = self.model_builder.build(best_config)
best_automl_model.restore(best_model_path)
return best_automl_model.model
[docs] def get_best_config(self):
"""
Return the best config found by the AutoEstimator
:return: A dictionary of best hyper parameters
"""
if not self.best_trial:
self.best_trial = self.searcher.get_best_trial()
best_config = self.best_trial.config
return best_config
def _get_best_automl_model(self):
"""
This is for internal use only.
Return the best automl model found by the AutoEstimator
:return: an automl base model instance
"""
if not self.best_trial:
self.best_trial = self.searcher.get_best_trial()
best_model_path = self.best_trial.model_path
best_config = self.best_trial.config
best_automl_model = self.model_builder.build(best_config)
best_automl_model.restore(best_model_path)
return best_automl_model
@staticmethod
def _validate_metric_mode(metric: Optional[Union[Callable, str]],
mode: Optional[str]) -> Optional[str]:
if not mode:
if callable(metric):
invalidInputError(False,
"You must specify `metric_mode` for your metric function")
try:
from bigdl.orca.automl.metrics import Evaluator
mode = Evaluator.get_metric_mode(metric)
except ValueError:
pass
if not mode:
invalidInputError(False,
f"We cannot infer metric mode with metric name of {metric}."
f" Please specify the `metric_mode` parameter in"
f" AutoEstimator.fit().")
if mode not in ["min", "max"]:
invalidInputError(False,
"`mode` has to be one of ['min', 'max']")
return mode
@staticmethod
def _check_spark_dataframe_input(data: Union[Tuple["ndarray", "ndarray"],
Callable, "DataFrame"],
validation_data: Optional[Union[Callable,
Tuple["ndarray", "ndarray"],
"DataFrame"]],
feature_cols: Optional[List[str]],
label_cols: Optional[List[str]]
) -> Tuple[Optional[List[str]], Optional[List[str]]]:
def check_cols(cols, cols_name):
if not cols:
invalidInputError(False,
f"You must input valid {cols_name} for Spark DataFrame"
f" data input")
if isinstance(cols, list):
return cols
if not isinstance(cols, str):
invalidInputError(False,
f"{cols_name} should be a string or a list of strings, "
f"but got {type(cols)}")
return [cols]
from pyspark.sql import DataFrame
if isinstance(data, DataFrame):
feature_cols = check_cols(feature_cols, cols_name="feature_cols")
label_cols = check_cols(label_cols, cols_name="label_cols")
if validation_data:
if not isinstance(validation_data, DataFrame):
invalidInputError(False,
f"data and validation_data should be both Spark DataFrame, "
f"but got validation_data of type {type(data)}")
return feature_cols, label_cols