Source code for bigdl.chronos.autots.autotsestimator

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import types

from bigdl.orca.automl.auto_estimator import AutoEstimator
from bigdl.chronos.data import TSDataset
import bigdl.orca.automl.hp as hp
from bigdl.chronos.autots.model import AutoModelFactory
from bigdl.chronos.autots.utils import recalculate_n_sampling


[docs]class AutoTSEstimator: """ Automated TimeSeries Estimator for time series forecasting task, which supports TSDataset and customized data creator as data input on built-in model (only "lstm", "tcn", "seq2seq" for now) or 3rd party model. >>> # Here is a use case example: >>> # prepare train/valid/test tsdataset >>> autoest = AutoTSEstimator(model="lstm", >>> search_space=search_space, >>> past_seq_len=6, >>> future_seq_len=1) >>> tsppl = autoest.fit(data=tsdata_train, >>> validation_data=tsdata_valid) >>> tsppl.predict(tsdata_test) >>> tsppl.save("my_tsppl") """ def __init__(self, model="lstm", search_space=dict(), metric="mse", metric_mode=None, loss=None, optimizer="Adam", past_seq_len='auto', future_seq_len=1, input_feature_num=None, output_target_num=None, selected_features="auto", backend="torch", logs_dir="/tmp/autots_estimator", cpus_per_trial=1, name="autots_estimator", remote_dir=None, ): """ AutoTSEstimator trains a model for time series forecasting. Users can choose one of the built-in models, or pass in a customized pytorch or keras model for tuning using AutoML. :param model: a string or a model creation function. A string indicates a built-in model, currently "lstm", "tcn", "seq2seq" are supported. A model creation function indicates a 3rd party model, the function should take a config param and return a torch.nn.Module (backend="torch") / tf model (backend="keras"). If you use chronos.data.TSDataset as data input, the 3rd party should have 3 dim input (num_sample, past_seq_len, input_feature_num) and 3 dim output (num_sample, future_seq_len, output_feature_num) and use the same key in the model creation function. If you use a customized data creator, the output of data creator should fit the input of model creation function. :param search_space: str or dict. hyper parameter configurations. For str, you can choose from "minimal", "normal", or "large", each represents a default search_space for our built-in model with different computing requirement. For dict, Read the API docs for each auto model. Some common hyper parameter can be explicitly set in named parameter. search_space should contain those parameters other than the keyword arguments in this constructor in its key. If a 3rd parth model is used, then you must set search_space to a dict. :param metric: String or customized evaluation metric function. If string, metric is the evaluation metric name to optimize, e.g. "mse". If callable function, it signature should be func(y_true, y_pred), where y_true and y_pred are numpy ndarray. The function should return a float value as evaluation result. :param metric_mode: One of ["min", "max"]. "max" means greater metric value is better. You have to specify metric_mode if you use a customized metric function. You don't have to specify metric_mode if you use the built-in metric in bigdl.orca.automl.metrics.Evaluator. :param loss: String or pytorch loss instance or pytorch loss creator function. The default loss function for pytorch backend is nn.MSELoss(). If users use backend="keras" and 3rd parth model this parameter will be ignored. :param optimizer: String or pyTorch optimizer creator function or tf.keras optimizer instance. If users use backend="keras" and 3rd parth model, this parameter will be ignored. :param past_seq_len: Int or or hp sampling function. The number of historical steps (i.e. lookback) used for forecasting. For hp sampling, see bigdl.orca.automl.hp for more details. The values defaults to 'auto', which will automatically infer the cycle length of each time series and take the mode of them. The search space will be automatically set to hp.randint(0.5*cycle_length, 2*cycle_length). :param future_seq_len: Int or List. The number of future steps to forecast. The value defaults to 1, if `future_seq_len` is a list, we will sample discretely according to the input list. 1 means the timestamp just after the observed data. :param input_feature_num: Int. The number of features in the input. The value is ignored if you use chronos.data.TSDataset as input data type. :param output_target_num: Int. The number of targets in the output. The value is ignored if you use chronos.data.TSDataset as input data type. :param selected_features: String. "all" and "auto" are supported for now. For "all", all features that are generated are used for each trial. For "auto", a subset is sampled randomly from all features for each trial. The parameter is ignored if not using chronos.data.TSDataset as input data type. The value defaults to "auto". :param backend: The backend of the auto model. We only support backend as "torch" or "keras" for now. :param logs_dir: Local directory to save logs and results. It defaults to "/tmp/autots_estimator" :param cpus_per_trial: Int. Number of cpus for each trial. It defaults to 1. :param name: name of the autots estimator. It defaults to "autots_estimator". :param remote_dir: String. Remote directory to sync training results and checkpoints. It defaults to None and doesn't take effects while running in local. While running in cluster, it defaults to "hdfs:///tmp/{name}". """ from bigdl.nano.utils.common import invalidInputError # check backend and set default loss MSE if backend == "torch": import torch if loss is None: loss = torch.nn.MSELoss() if isinstance(search_space, str): search_space = AutoModelFactory.get_default_search_space(model, search_space) self._future_seq_len = future_seq_len # for support future_seq_len list input. invalidInputError(isinstance(future_seq_len, int) or isinstance(future_seq_len, list), f"future_seq_len only support int or List, but found" f" {type(future_seq_len)}") future_seq_len = future_seq_len if isinstance(future_seq_len, int) else len(future_seq_len) # 3rd party model if isinstance(model, types.FunctionType): from bigdl.orca.automl.auto_estimator import AutoEstimator if backend == "torch": self.model = AutoEstimator.from_torch(model_creator=model, optimizer=optimizer, loss=loss, logs_dir=logs_dir, resources_per_trial={"cpu": cpus_per_trial}, name=name) if backend == "keras": self.model = AutoEstimator.from_keras(model_creator=model, logs_dir=logs_dir, resources_per_trial={"cpu": cpus_per_trial}, name=name) self.metric = metric self.metric_mode = metric_mode search_space.update({"past_seq_len": past_seq_len, "future_seq_len": future_seq_len, "input_feature_num": input_feature_num, "output_feature_num": output_target_num}) self.search_space = search_space # built-in model if isinstance(model, str): # update auto model common search space search_space.update({"past_seq_len": past_seq_len, "future_seq_len": future_seq_len, "input_feature_num": input_feature_num, "output_target_num": output_target_num, "loss": loss, "metric": metric, "metric_mode": metric_mode, "optimizer": optimizer, "backend": backend, "logs_dir": logs_dir, "cpus_per_trial": cpus_per_trial, "name": name}) # create auto model from name self.model = AutoModelFactory.create_auto_model(name=model, search_space=search_space) # save selected features setting for data creator generation self.selected_features = selected_features self.backend = backend self._scaler = None self._scaler_index = None
[docs] def fit(self, data, epochs=1, batch_size=32, validation_data=None, metric_threshold=None, n_sampling=1, search_alg=None, search_alg_params=None, scheduler=None, scheduler_params=None ): """ fit using AutoEstimator :param data: train data. For backend of "torch", data can be a TSDataset or a function that takes a config dictionary as parameter and returns a PyTorch DataLoader. For backend of "keras", data can be a TSDataset or a function that takes a config dictionary as parameter and returns a Tensorflow Dataset. Please notice that you should stick to the same data type when you predict/evaluate/fit on the TSPipeline you get from `AutoTSEstimator.fit`. :param epochs: Max number of epochs to train in each trial. Defaults to 1. If you have also set metric_threshold, a trial will stop if either it has been optimized to the metric_threshold or it has been trained for {epochs} epochs. :param batch_size: Int or hp sampling function from an integer space. Training batch size. It defaults to 32. :param validation_data: Validation data. Validation data type should be the same as data. :param metric_threshold: a trial will be terminated when metric threshold is met. :param n_sampling: Number of trials to evaluate in total. Defaults to 1. If hp.grid_search is in search_space, the grid will be run n_sampling of trials and round up n_sampling according to hp.grid_search. If this is -1, (virtually) infinite samples are generated until a stopping condition is met. :param search_alg: str, all supported searcher provided by ray tune (i.e."variant_generator", "random", "ax", "dragonfly", "skopt", "hyperopt", "bayesopt", "bohb", "nevergrad", "optuna", "zoopt" and "sigopt") :param search_alg_params: extra parameters for searcher algorithm besides search_space, metric and searcher mode :param scheduler: str, all supported scheduler provided by ray tune :param scheduler_params: parameters for scheduler :return: a TSPipeline with the best model. """ is_third_party_model = isinstance(self.model, AutoEstimator) # generate data creator from TSDataset (pytorch base require validation data) if isinstance(data, TSDataset) and isinstance(validation_data, TSDataset): train_d, val_d = self._prepare_data_creator( search_space=self.search_space if is_third_party_model else self.model.search_space, train_data=data, val_data=validation_data, ) self._scaler = data.scaler self._scaler_index = data.scaler_index else: train_d, val_d = data, validation_data if is_third_party_model: self.search_space.update({"batch_size": batch_size}) n_sampling = recalculate_n_sampling(self.search_space, n_sampling) if n_sampling != -1 else -1 self.model.fit( data=train_d, epochs=epochs, validation_data=val_d, metric=self.metric, metric_mode=self.metric_mode, metric_threshold=metric_threshold, n_sampling=n_sampling, search_space=self.search_space, search_alg=search_alg, search_alg_params=search_alg_params, scheduler=scheduler, scheduler_params=scheduler_params, ) if not is_third_party_model: self.model.fit( data=train_d, epochs=epochs, batch_size=batch_size, validation_data=val_d, metric_threshold=metric_threshold, n_sampling=n_sampling, search_alg=search_alg, search_alg_params=search_alg_params, scheduler=scheduler, scheduler_params=scheduler_params ) if self.backend == "torch": from bigdl.chronos.autots.tspipeline import TSPipeline best_model = self._get_best_automl_model() return TSPipeline(model=best_model.model, loss=best_model.criterion, optimizer=best_model.optimizer, model_creator=best_model.model_creator, loss_creator=best_model.loss_creator, optimizer_creator=best_model.optimizer_creator, best_config=self.get_best_config(), scaler=self._scaler, scaler_index=self._scaler_index) if self.backend == "keras": best_model = self._get_best_automl_model() return best_model
def _prepare_data_creator(self, search_space, train_data, val_data=None): """ prepare the data creators and add selected features to search_space :param search_space: the search space :param train_data: train data :param val_data: validation data :return: data creators from train and validation data """ import ray from bigdl.nano.utils.common import invalidInputError # automatically inference output_feature_num # input_feature_num will be set by base pytorch model according to selected features. search_space['output_feature_num'] = len(train_data.target_col) if search_space['past_seq_len'] == 'auto': cycle_length = train_data.get_cycle_length(aggregate='mode', top_k=3) cycle_length = 2 if cycle_length < 2 else cycle_length search_space['past_seq_len'] = hp.randint(cycle_length//2, cycle_length*2) # append feature selection into search space # TODO: more flexible setting all_features = train_data.feature_col if self.selected_features not in ('all', 'auto'): invalidInputError(False, "Only 'all' and 'auto' are supported for selected_features, " f"but found {self.selected_features}") if self.selected_features == "auto": if len(all_features) == 0: search_space['selected_features'] = all_features else: search_space['selected_features'] = hp.choice_n(all_features, min_items=0, max_items=len(all_features)) if self.selected_features == "all": search_space['selected_features'] = all_features # put train/val data in ray train_data_id = ray.put(train_data) valid_data_id = ray.put(val_data) if self.backend == "torch": import torch from torch.utils.data import TensorDataset, DataLoader def train_data_creator(config): train_d = ray.get(train_data_id) x, y = train_d.roll(lookback=config.get('past_seq_len'), horizon=self._future_seq_len, feature_col=config['selected_features']) \ .to_numpy() return DataLoader(TensorDataset(torch.from_numpy(x).float(), torch.from_numpy(y).float()), batch_size=config["batch_size"], shuffle=True) def val_data_creator(config): val_d = ray.get(valid_data_id) x, y = val_d.roll(lookback=config.get('past_seq_len'), horizon=self._future_seq_len, feature_col=config['selected_features']) \ .to_numpy() return DataLoader(TensorDataset(torch.from_numpy(x).float(), torch.from_numpy(y).float()), batch_size=config["batch_size"], shuffle=True) return train_data_creator, val_data_creator if self.backend == "keras": def train_data_creator(config): train_d = ray.get(train_data_id) train_d.roll(lookback=config.get('past_seq_len'), horizon=self._future_seq_len, feature_col=config['selected_features']) return train_d.to_tf_dataset(batch_size=config["batch_size"], shuffle=True) def val_data_creator(config): val_d = ray.get(valid_data_id) val_d.roll(lookback=config.get('past_seq_len'), horizon=self._future_seq_len, feature_col=config['selected_features']) return val_d.to_tf_dataset(batch_size=config["batch_size"], shuffle=False) return train_data_creator, val_data_creator def _get_best_automl_model(self): """ For internal use only. :return: the best automl model instance """ return self.model._get_best_automl_model()
[docs] def get_best_config(self): """ Get the best configuration :return: A dictionary of best hyper parameters """ return self.model.get_best_config()