Source code for bigdl.chronos.forecaster.autoformer_forecaster

# Copyright 2016 The BigDL Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
from bigdl.chronos.forecaster.abstract import Forecaster
from bigdl.chronos.model.autoformer import model_creator, loss_creator
from import TensorDataset, DataLoader
from bigdl.chronos.model.autoformer.Autoformer import AutoFormer, _transform_config_to_namedtuple
from bigdl.nano.utils.log4Error import invalidInputError, invalidOperationError
from bigdl.chronos.forecaster.utils import check_transformer_data
from bigdl.chronos.pytorch import TSTrainer as Trainer
from import Space
from bigdl.chronos.forecaster.utils_hpo import GenericTSTransformerLightningModule, \

from .utils_hpo import _format_metric_str
import warnings

[docs]class AutoformerForecaster(Forecaster): def __init__(self, past_seq_len, future_seq_len, input_feature_num, output_feature_num, label_len, freq, output_attention=False, moving_avg=25, d_model=128, embed='timeF', dropout=0.05, factor=3, n_head=8, d_ff=256, activation='gelu', e_layers=2, d_layers=1, optimizer="Adam", loss="mse", lr=0.0001, lr_scheduler_milestones=[3, 4, 5, 6, 7, 8, 9, 10], metrics=["mse"], seed=None, distributed=False, workers_per_node=1, distributed_backend="ray"): """ Build a AutoformerForecaster Forecast Model. :param past_seq_len: Specify the history time steps (i.e. lookback). :param future_seq_len: Specify the output time steps (i.e. horizon). :param input_feature_num: Specify the feature dimension. :param output_feature_num: Specify the output dimension. :param label_len: Start token length of AutoFormer decoder. :param freq: Freq for time features encoding. You may choose from "s", "t","h","d","w","m" for second, minute, hour, day, week or month. :param optimizer: Specify the optimizer used for training. This value defaults to "Adam". :param loss: str or pytorch loss instance, Specify the loss function used for training. This value defaults to "mse". You can choose from "mse", "mae", "huber_loss" or any customized loss instance you want to use. :param lr: Specify the learning rate. This value defaults to 0.001. :param lr_scheduler_milestones: Specify the milestones parameters in torch.optim.lr_scheduler.MultiStepLR.This value defaults to [3, 4, 5, 6, 7, 8, 9, 10]. If you don't want to use scheduler, set this parameter to None to disbale lr_scheduler. :param metrics: A list contains metrics for evaluating the quality of forecasting. You may only choose from "mse" and "mae" for a distributed forecaster. You may choose from "mse", "mae", "rmse", "r2", "mape", "smape" or a callable function for a non-distributed forecaster. If callable function, it signature should be func(y_true, y_pred), where y_true and y_pred are numpy ndarray. :param seed: int, random seed for training. This value defaults to None. :param distributed: bool, if init the forecaster in a distributed fashion. If True, the internal model will use an Orca Estimator. If False, the internal model will use a pytorch model. The value defaults to False. :param workers_per_node: int, the number of worker you want to use. The value defaults to 1. The param is only effective when distributed is set to True. :param distributed_backend: str, select from "ray" or "horovod". The value defaults to "ray". :param kwargs: other hyperparameter please refer to """ # config setting self.data_config = { "past_seq_len": past_seq_len, "future_seq_len": future_seq_len, "input_feature_num": input_feature_num, "output_feature_num": output_feature_num, "label_len": label_len } self.model_config = { "seq_len": past_seq_len, "label_len": label_len, "pred_len": future_seq_len, "output_attention": output_attention, "moving_avg": moving_avg, "enc_in": input_feature_num, "d_model": d_model, "embed": embed, "freq": freq, "dropout": dropout, "dec_in": input_feature_num, "factor": factor, "n_head": n_head, "d_ff": d_ff, "activation": activation, "e_layers": e_layers, "c_out": output_feature_num, "d_layers": d_layers, "seed": seed, } self.loss_config = { "loss": loss } self.optim_config = { "lr": lr, "optim": optimizer, "lr_scheduler_milestones": lr_scheduler_milestones, } self.model_config.update(self.loss_config) self.model_config.update(self.optim_config) self.metrics = metrics self.distributed = distributed self.checkpoint_callback = True # seed setting if not isinstance(seed, Space): from pytorch_lightning import seed_everything seed_everything(seed=seed, workers=True) # disable multi-process training for now. # TODO: enable it in future. self.num_processes = 1 self.use_ipex = False self.onnx_available = False self.quantize_available = False self.use_amp = False self.use_hpo = True has_space = _config_has_search_space( config={**self.model_config, **self.optim_config, **self.loss_config, **self.data_config}) if not has_space: self.use_hpo = False self.internal = model_creator(self.model_config) self.model_creator = model_creator self.loss_creator = loss_creator def _build_automodel(self, data, validation_data=None, batch_size=32, epochs=1): """Build a Generic Model using config parameters.""" merged_config = {**self.model_config, **self.optim_config, **self.loss_config, **self.data_config} model_config_keys = list(self.model_config.keys()) data_config_keys = list(self.data_config.keys()) optim_config_keys = list(self.optim_config.keys()) loss_config_keys = list(self.loss_config.keys()) return GenericTSTransformerLightningModule( model_creator=self.model_creator, loss_creator=self.loss_creator, data=data, validation_data=validation_data, batch_size=batch_size, epochs=epochs, metrics=[_str2metric(metric) for metric in self.metrics], scheduler=None, # TODO num_processes=self.num_processes, model_config_keys=model_config_keys, data_config_keys=data_config_keys, optim_config_keys=optim_config_keys, loss_config_keys=loss_config_keys, **merged_config)
[docs] def tune(self, data, validation_data, target_metric='mse', direction="minimize", directions=None, n_trials=2, n_parallels=1, epochs=1, batch_size=32, acceleration=False, input_sample=None, **kwargs): """ Search the hyper parameter. :param data: train data, as numpy ndarray tuple (x, y, x_enc, y_enc) :param validation_data: validation data, as numpy ndarray tuple (x, y, x_enc, y_enc) :param target_metric: the target metric to optimize, a string or an instance of torchmetrics.metric.Metric, default to 'mse'. :param direction: in which direction to optimize the target metric, "maximize" - larger the better "minimize" - smaller the better default to "minimize". :param n_trials: number of trials to run :param n_parallels: number of parallel processes used to run trials. to use parallel tuning you need to use a RDB url for storage and specify study_name. For more information, refer to Nano AutoML user guide. :param epochs: the number of epochs to run in each trial fit, defaults to 1 :param batch_size: number of batch size for each trial fit, defaults to 32 :param acceleration: Whether to automatically consider the model after inference acceleration in the search process. It will only take effect if target_metric contains "latency". Default value is False. :param input_sample: A set of inputs for trace, defaults to None if you have trace before or model is a LightningModule with any dataloader attached. """ invalidInputError(not self.distributed, "HPO is not supported in distributed mode." "Please use AutoTS instead.") invalidOperationError(self.use_hpo, "HPO is disabled for this forecaster." "You may specify search space in hyper parameters to enable it.") # prepare data from bigdl.chronos.pytorch import TSTrainer as Trainer # data transformation if isinstance(data, tuple): check_transformer_data(data[0], data[1], data[2], data[3], self.data_config) if validation_data and isinstance(validation_data, tuple): check_transformer_data(validation_data[0], validation_data[1], validation_data[2], validation_data[3], self.data_config) else: invalidInputError(False, "To use tuning, you must provide validation_data" "as numpy arrays.") else: invalidInputError(False, "HPO only supports numpy train input data.") if input_sample is None: input_sample = (torch.from_numpy(data[0][:1, :, :]), torch.from_numpy(data[1][:1, :, :]), torch.from_numpy(data[2][:1, :, :]), torch.from_numpy(data[3][:1, :, :])) # prepare target metric if validation_data is not None: formated_target_metric = _format_metric_str('val', target_metric) else: invalidInputError(False, "To use tuning, you must provide validation_data" "as numpy arrays.") # build auto model self.tune_internal = self._build_automodel(data, validation_data, batch_size, epochs) self.trainer = Trainer(logger=False, max_epochs=epochs, checkpoint_callback=self.checkpoint_callback, num_processes=self.num_processes, use_ipex=self.use_ipex, use_hpo=True) # run hyper parameter search self.internal = self.tune_internal, n_trials=n_trials, target_metric=formated_target_metric, direction=direction, directions=directions, n_parallels=n_parallels, acceleration=acceleration, input_sample=input_sample, **kwargs) if self.trainer.hposearcher.objective.mo_hpo: return self.internal else: # reset train and validation datasets self.trainer.reset_train_val_dataloaders(self.internal)
[docs] def search_summary(self): # add tuning check invalidOperationError(self.use_hpo, "No search summary when HPO is disabled.") return self.trainer.search_summary()
[docs] def fit(self, data, epochs=1, batch_size=32, use_trial_id=None): """ Fit(Train) the forecaster. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0 and time_enc = True :param epochs: Number of epochs you want to train. The value defaults to 1. :param batch_size: Number of batch size you want to train. The value defaults to 32. if you input a pytorch dataloader for `data`, the batch_size will follow the batch_size setted in `data`. :param use_trail_id: choose a internal according to trial_id, which is used only in multi-objective search. """ # distributed is not supported. if self.distributed: invalidInputError(False, "distributed is not support in Autoformer") # transform a tuple to dataloader. if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=True) from bigdl.chronos.pytorch import TSTrainer as Trainer # Trainer init and fitting if not self.use_hpo: self.trainer = Trainer(logger=False, max_epochs=epochs, checkpoint_callback=self.checkpoint_callback, num_processes=1, use_ipex=self.use_ipex, distributed_backend="spawn") else: # check whether the user called the tune function invalidOperationError(hasattr(self, "trainer"), "There is no trainer, and you " "should call .tune() before .fit()") # build internal according to use_trail_id for multi-objective HPO if self.trainer.hposearcher.objective.mo_hpo: invalidOperationError(, "You must tune before fit the model.") invalidInputError(use_trial_id is not None, "For multibojective HPO, you must specify a trial id for fit.") trial =[use_trial_id] self.internal = self.tune_internal._model_build(trial), data)
[docs] def predict(self, data, batch_size=32): """ Predict using a trained forecaster. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0, time_enc = True and is_predict = True :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). :return: A list of numpy ndarray """ if self.distributed: invalidInputError(False, "distributed is not support in Autoformer") invalidInputError(isinstance(data, tuple) or isinstance(data, DataLoader), "The input data to predict() support formats: numpy ndarray tuple" f" and pytorch dataloader, but found {type(data)}.") if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=False) return self.trainer.predict(self.internal, data)
[docs] def evaluate(self, data, batch_size=32): """ Predict using a trained forecaster. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0 and time_enc = True :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). :return: A dict, currently returns the loss rather than metrics """ # TODO: use metrics here if self.distributed: invalidInputError(False, "distributed is not support in Autoformer") if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=False) return self.trainer.validate(self.internal, data)
[docs] def load(self, checkpoint_file): """ restore the forecaster. :param checkpoint_file: The checkpoint file location you want to load the forecaster. """ self.trainer = Trainer(logger=False, max_epochs=1, checkpoint_callback=self.checkpoint_callback, num_processes=1, use_ipex=self.use_ipex, distributed_backend="spawn") checkpoint = torch.load(checkpoint_file) config = checkpoint["hyper_parameters"] args = _transform_config_to_namedtuple(config) internal = AutoFormer.load_from_checkpoint(checkpoint_file, configs=args) self.internal = internal
[docs] def save(self, checkpoint_file): """ save the forecaster. :param checkpoint_file: The checkpoint file location you want to load the forecaster. """ if self.use_hpo: self.trainer.model = self.trainer.model.model self.trainer.save_checkpoint(checkpoint_file)
def _str2metric(metric): # map metric str to function if isinstance(metric, str): metric_name = metric from bigdl.chronos.metric.forecast_metrics import REGRESSION_MAP metric_func = REGRESSION_MAP[metric_name] def metric(y_label, y_predict): y_label = y_label.numpy() y_predict = y_predict.numpy() return metric_func(y_label, y_predict) metric.__name__ = metric_name return metric