Source code for bigdl.orca.automl.xgboost.auto_xgb

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either exp'
# ress or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from bigdl.orca.automl.xgboost.XGBoost import XGBoostModelBuilder
from bigdl.orca.automl.auto_estimator import AutoEstimator
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union
if TYPE_CHECKING:
    from ray.tune.sample import Float, Function, Integer
    from functools import partial
    from pyspark.sql import DataFrame
    from numpy import ndarray


[docs]class AutoXGBClassifier(AutoEstimator): def __init__(self, logs_dir: str="/tmp/auto_xgb_classifier_logs", cpus_per_trial: int=1, name: Optional[str]=None, remote_dir: Optional[str]=None, **xgb_configs ) -> None: """ Automated xgboost classifier Example: >>> search_space = {"n_estimators": hp.grid_search([50, 1000]), "max_depth": hp.grid_search([2, 15]), "lr": hp.loguniform(1e-4, 1e-1)} >>> auto_xgb_clf = AutoXGBClassifier(cpus_per_trial=4, name="auto_xgb_classifier", **config) >>> auto_xgb_clf.fit(data=(X_train, y_train), validation_data=(X_val, y_val), metric="error", metric_mode="min", n_sampling=1, search_space=search_space) >>> best_model = auto_xgb_clf.get_best_model() :param logs_dir: Local directory to save logs and results. It defaults to "/tmp/auto_xgb_classifier_logs" :param cpus_per_trial: Int. Number of cpus for each trial. It defaults to 1. The value will also be assigned to n_jobs in xgboost, which is the number of parallel threads used to run xgboost. :param name: Name of the auto xgboost classifier. :param remote_dir: String. Remote directory to sync training results and checkpoints. It defaults to None and doesn't take effects while running in local. While running in cluster, it defaults to "hdfs:///tmp/{name}". :param xgb_configs: Other scikit learn xgboost parameters. You may refer to https://xgboost.readthedocs.io/en/latest/python/python_api.html#module-xgboost.sklearn for the parameter names to specify. Note that we will directly use cpus_per_trial value for n_jobs in xgboost and you shouldn't specify n_jobs again. """ xgb_model_builder = XGBoostModelBuilder(model_type='classifier', cpus_per_trial=cpus_per_trial, **xgb_configs) resources_per_trial = {"cpu": cpus_per_trial} if cpus_per_trial else None super().__init__(model_builder=xgb_model_builder, logs_dir=logs_dir, resources_per_trial=resources_per_trial, remote_dir=remote_dir, name=name)
[docs] def fit(self, data: Union["partial", Tuple["ndarray", "ndarray"], "DataFrame"], epochs: int=1, validation_data: Optional[Union["partial", Tuple["ndarray", "ndarray"], "DataFrame"]]=None, metric: Optional[Union[Callable, str]]=None, metric_mode: Optional[str]=None, metric_threshold: Optional[Union[int, float]]=None, n_sampling: int=1, search_space: Optional[Dict]=None, search_alg: Optional[str]=None, search_alg_params: Optional[Dict]=None, scheduler: Optional[str]=None, scheduler_params: Optional[Dict]=None, feature_cols: Optional[List[str]]=None, label_cols: Optional[List[str]]=None, ) -> None: """ Automatically fit the model and search for the best hyperparameters. :param data: A Spark DataFrame, a tuple of ndarrays or a function. If data is a tuple of ndarrays, it should be in the form of (x, y), where x is training input data and y is training target data. If data is a function, it should takes config as argument and returns a tuple of ndarrays in the form of (x, y). :param epochs: Max number of epochs to train in each trial. Defaults to 1. If you have also set metric_threshold, a trial will stop if either it has been optimized to the metric_threshold or it has been trained for {epochs} epochs. :param validation_data: Validation data. Validation data type should be the same as data. :param metric: String or customized evaluation metric function. If string, metric is the evaluation metric name to optimize, e.g. "mse". If callable function, it signature should be func(y_true, y_pred), where y_true and y_pred are numpy ndarray. The function should return a float value as evaluation result. :param metric_mode: One of ["min", "max"]. "max" means greater metric value is better. You have to specify metric_mode if you use a customized metric function. You don't have to specify metric_mode if you use the built-in metric in bigdl.orca.automl.metrics.Evaluator. :param metric_threshold: a trial will be terminated when metric threshold is met :param n_sampling: Number of times to sample from the search_space. Defaults to 1. If hp.grid_search is in search_space, the grid will be repeated n_sampling of times. If this is -1, (virtually) infinite samples are generated until a stopping condition is met. :param search_space: a dict for search space :param search_alg: str, all supported searcher provided by ray tune (i.e."variant_generator", "random", "ax", "dragonfly", "skopt", "hyperopt", "bayesopt", "bohb", "nevergrad", "optuna", "zoopt" and "sigopt") :param search_alg_params: extra parameters for searcher algorithm besides search_space, metric and searcher mode :param scheduler: str, all supported scheduler provided by ray tune :param scheduler_params: parameters for scheduler :param feature_cols: feature column names if data is Spark DataFrame. :param label_cols: target column names if data is Spark DataFrame. """ data, validation_data, feature_cols, label_cols = _merge_cols_for_spark_df(data, validation_data, feature_cols, label_cols) super().fit(data=data, epochs=epochs, validation_data=validation_data, metric=metric, metric_mode=metric_mode, metric_threshold=metric_threshold, n_sampling=n_sampling, search_space=search_space, search_alg=search_alg, search_alg_params=search_alg_params, scheduler=scheduler, scheduler_params=scheduler_params, feature_cols=feature_cols, label_cols=label_cols)
[docs]class AutoXGBRegressor(AutoEstimator): def __init__(self, logs_dir: str="/tmp/auto_xgb_regressor_logs", cpus_per_trial: int=1, name: Optional[str]=None, remote_dir: Optional[str]=None, **xgb_configs ) -> None: """ Automated xgboost regressor Example: >>> search_space = {"n_estimators": hp.grid_search([800, 1000]), "max_depth": hp.grid_search([10, 15]), "lr": hp.loguniform(1e-4, 1e-1), "min_child_weight": hp.choice([1, 2, 3]), } >>> auto_xgb_reg = AutoXGBRegressor(cpus_per_trial=2, name="auto_xgb_regressor", **config) >>> auto_xgb_reg.fit(data=(X_train, y_train), validation_data=(X_val, y_val), metric="rmse", n_sampling=1, search_space=search_space) >>> best_model = auto_xgb_reg.get_best_model() :param logs_dir: Local directory to save logs and results. It defaults to "/tmp/auto_xgb_classifier_logs" :param cpus_per_trial: Int. Number of cpus for each trial. The value will also be assigned to n_jobs, which is the number of parallel threads used to run xgboost. :param name: Name of the auto xgboost classifier. :param remote_dir: String. Remote directory to sync training results and checkpoints. It defaults to None and doesn't take effects while running in local. While running in cluster, it defaults to "hdfs:///tmp/{name}". :param xgb_configs: Other scikit learn xgboost parameters. You may refer to https://xgboost.readthedocs.io/en/latest/python/python_api.html#module-xgboost.sklearn for the parameter names to specify. Note that we will directly use cpus_per_trial value for n_jobs in xgboost and you shouldn't specify n_jobs again. """ xgb_model_builder = XGBoostModelBuilder(model_type='regressor', cpus_per_trial=cpus_per_trial, **xgb_configs) resources_per_trial = {"cpu": cpus_per_trial} if cpus_per_trial else None super().__init__(model_builder=xgb_model_builder, logs_dir=logs_dir, resources_per_trial=resources_per_trial, remote_dir=remote_dir, name=name)
[docs] def fit(self, data: Union["partial", Tuple["ndarray", "ndarray"], "DataFrame"], epochs: int=1, validation_data: Optional[Union["partial", Tuple["ndarray", "ndarray"], "DataFrame"]]=None, metric: Optional[Union[Callable, str]]=None, metric_mode: Optional[str]=None, metric_threshold: Optional[Union["float", "int"]]=None, n_sampling: int=1, search_space: Optional[Dict]=None, search_alg: Optional[str]=None, search_alg_params: Optional[Dict]=None, scheduler: Optional[str]=None, scheduler_params: Optional[Dict]=None, feature_cols: Optional[List[str]]=None, label_cols: Optional[List[str]]=None, ) -> None: """ Automatically fit the model and search for the best hyperparameters. :param data: A Spark DataFrame, a tuple of ndarrays or a function. If data is a tuple of ndarrays, it should be in the form of (x, y), where x is training input data and y is training target data. If data is a function, it should takes config as argument and returns a tuple of ndarrays in the form of (x, y). :param epochs: Max number of epochs to train in each trial. Defaults to 1. If you have also set metric_threshold, a trial will stop if either it has been optimized to the metric_threshold or it has been trained for {epochs} epochs. :param validation_data: Validation data. Validation data type should be the same as data. :param metric: String or customized evaluation metric function. If string, metric is the evaluation metric name to optimize, e.g. "mse". If callable function, it signature should be func(y_true, y_pred), where y_true and y_pred are numpy ndarray. The function should return a float value as evaluation result. :param metric_mode: One of ["min", "max"]. "max" means greater metric value is better. You have to specify metric_mode if you use a customized metric function. You don't have to specify metric_mode if you use the built-in metric in bigdl.orca.automl.metrics.Evaluator. :param metric_threshold: a trial will be terminated when metric threshold is met :param n_sampling: Number of times to sample from the search_space. Defaults to 1. If hp.grid_search is in search_space, the grid will be repeated n_sampling of times. If this is -1, (virtually) infinite samples are generated until a stopping condition is met. :param search_space: a dict for search space :param search_alg: str, all supported searcher provided by ray tune (i.e."variant_generator", "random", "ax", "dragonfly", "skopt", "hyperopt", "bayesopt", "bohb", "nevergrad", "optuna", "zoopt" and "sigopt") :param search_alg_params: extra parameters for searcher algorithm besides search_space, metric and searcher mode :param scheduler: str, all supported scheduler provided by ray tune :param scheduler_params: parameters for scheduler :param feature_cols: feature column names if data is Spark DataFrame. :param label_cols: target column names if data is Spark DataFrame. """ data, validation_data, feature_cols, label_cols = _merge_cols_for_spark_df(data, validation_data, feature_cols, label_cols) super().fit(data=data, epochs=epochs, validation_data=validation_data, metric=metric, metric_mode=metric_mode, metric_threshold=metric_threshold, n_sampling=n_sampling, search_space=search_space, search_alg=search_alg, search_alg_params=search_alg_params, scheduler=scheduler, scheduler_params=scheduler_params, feature_cols=feature_cols, label_cols=label_cols)
def _merge_cols_for_spark_df(data: Union["DataFrame", Tuple["ndarray", "ndarray"], "partial"], validation_data: Optional[Union["partial", Tuple["ndarray", "ndarray"], "DataFrame"]], feature_cols: Optional[List[str]], label_cols: Optional[List[str]]) -> Tuple: # merge feature_cols/label_cols to one column, to adapt to the meanings of feature_cols and # label_cols in AutoEstimator, which correspond to the model inputs/outputs. from pyspark.sql import DataFrame from pyspark.sql.functions import array def concat_cols(data, feature_cols, label_cols): combined_feature_name = "combined_features" combined_target_name = "combined_targets" data = data.select(array(*feature_cols).alias(combined_feature_name), array(*label_cols).alias(combined_target_name)) return data, combined_feature_name, combined_target_name feature_cols, label_cols = AutoEstimator._check_spark_dataframe_input(data, validation_data, feature_cols, label_cols) if isinstance(data, DataFrame): data, combined_feature_name, combined_target_name = concat_cols(data, feature_cols, label_cols) if validation_data is not None: validation_data, _, _ = concat_cols(validation_data, feature_cols, label_cols) feature_cols = [combined_feature_name] label_cols = [combined_target_name] return data, validation_data, feature_cols, label_cols