Source code for bigdl.chronos.forecaster.autoformer_forecaster

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import torch
import numpy as np
from pandas import Timedelta
from bigdl.chronos.forecaster.abstract import Forecaster
from bigdl.chronos.forecaster.utils import read_csv, delete_folder
from bigdl.chronos.metric.forecast_metrics import Evaluator
from bigdl.chronos.model.autoformer import model_creator, loss_creator
from torch.utils.data import TensorDataset, DataLoader
from bigdl.chronos.model.autoformer.Autoformer import AutoFormer, _transform_config_to_namedtuple
from bigdl.nano.utils.common import invalidInputError, invalidOperationError
from bigdl.chronos.forecaster.utils import check_transformer_data
from bigdl.chronos.pytorch import TSTrainer as Trainer
from bigdl.chronos.data import TSDataset
from bigdl.nano.automl.hpo.space import Space
from bigdl.chronos.forecaster.utils_hpo import GenericTSTransformerLightningModule, \
    _config_has_search_space
from bigdl.chronos.pytorch.context_manager import DummyForecasterContextManager,\
    ForecasterContextManager

from .utils_hpo import _format_metric_str
import warnings
from tempfile import TemporaryDirectory
import os


[docs]class AutoformerForecaster(Forecaster): def __init__(self, past_seq_len, future_seq_len, input_feature_num, output_feature_num, freq, label_len=None, output_attention=False, moving_avg=25, d_model=128, embed='timeF', dropout=0.05, factor=3, n_head=8, d_ff=256, activation='gelu', e_layers=2, d_layers=1, optimizer="Adam", loss="mse", lr=0.0001, lr_scheduler_milestones=[3, 4, 5, 6, 7, 8, 9, 10], metrics=["mse"], seed=None, distributed=False, workers_per_node=1, distributed_backend="ray"): """ Build a AutoformerForecaster Forecast Model. :param past_seq_len: Specify the history time steps (i.e. lookback). :param future_seq_len: Specify the output time steps (i.e. horizon). :param input_feature_num: Specify the feature dimension. :param output_feature_num: Specify the output dimension. :param freq: Freq for time features encoding. You may choose from "s", "t","h","d","w","m" for second, minute, hour, day, week or month. :param label_len: Start token length of AutoFormer decoder. :param optimizer: Specify the optimizer used for training. This value defaults to "Adam". :param loss: str or pytorch loss instance, Specify the loss function used for training. This value defaults to "mse". You can choose from "mse", "mae", "huber_loss" or any customized loss instance you want to use. :param lr: Specify the learning rate. This value defaults to 0.001. :param lr_scheduler_milestones: Specify the milestones parameters in torch.optim.lr_scheduler.MultiStepLR.This value defaults to [3, 4, 5, 6, 7, 8, 9, 10]. If you don't want to use scheduler, set this parameter to None to disbale lr_scheduler. :param metrics: A list contains metrics for evaluating the quality of forecasting. You may only choose from "mse" and "mae" for a distributed forecaster. You may choose from "mse", "mae", "rmse", "r2", "mape", "smape" or a callable function for a non-distributed forecaster. If callable function, it signature should be func(y_true, y_pred), where y_true and y_pred are numpy ndarray. :param seed: int, random seed for training. This value defaults to None. :param distributed: bool, if init the forecaster in a distributed fashion. If True, the internal model will use an Orca Estimator. If False, the internal model will use a pytorch model. The value defaults to False. :param workers_per_node: int, the number of worker you want to use. The value defaults to 1. The param is only effective when distributed is set to True. :param distributed_backend: str, select from "ray" or "horovod". The value defaults to "ray". :param kwargs: other hyperparameter please refer to https://github.com/zhouhaoyi/Informer2020#usage """ invalidInputError(past_seq_len > 1, "past_seq_len of Autoformer must exceeds one.") # config setting self.data_config = { "past_seq_len": past_seq_len, "future_seq_len": future_seq_len, "input_feature_num": input_feature_num, "output_feature_num": output_feature_num, "label_len": past_seq_len//2 if label_len is None else label_len } self.model_config = { "seq_len": past_seq_len, "label_len": past_seq_len//2 if label_len is None else label_len, "pred_len": future_seq_len, "output_attention": output_attention, "moving_avg": moving_avg, "enc_in": input_feature_num, "d_model": d_model, "embed": embed, "freq": freq, "dropout": dropout, "dec_in": input_feature_num, "factor": factor, "n_head": n_head, "d_ff": d_ff, "activation": activation, "e_layers": e_layers, "c_out": output_feature_num, "d_layers": d_layers, "seed": seed, } self.loss_config = { "loss": loss } self.optim_config = { "lr": lr, "optim": optimizer, "lr_scheduler_milestones": lr_scheduler_milestones, } self.model_config.update(self.loss_config) self.model_config.update(self.optim_config) self.metrics = metrics self.distributed = distributed self.checkpoint_callback = True # seed setting if not isinstance(seed, Space): from pytorch_lightning import seed_everything seed_everything(seed=seed, workers=True) # disable multi-process training for now. # TODO: enable it in future. self.num_processes = 1 self.use_ipex = False self.onnx_available = False self.quantize_available = False self.use_amp = False self.use_hpo = True # Model preparation self.fitted = False has_space = _config_has_search_space( config={**self.model_config, **self.optim_config, **self.loss_config, **self.data_config}) if not has_space: self.use_hpo = False self.internal = model_creator(self.model_config) self.model_creator = model_creator self.loss_creator = loss_creator self.cxt_manager = DummyForecasterContextManager() self.context_enabled = False current_num_threads = torch.get_num_threads() self.thread_num = current_num_threads self.accelerate_method = None def _build_automodel(self, data, validation_data=None, batch_size=32, epochs=1): """Build a Generic Model using config parameters.""" merged_config = {**self.model_config, **self.optim_config, **self.loss_config, **self.data_config} model_config_keys = list(self.model_config.keys()) data_config_keys = list(self.data_config.keys()) optim_config_keys = list(self.optim_config.keys()) loss_config_keys = list(self.loss_config.keys()) return GenericTSTransformerLightningModule( model_creator=self.model_creator, loss_creator=self.loss_creator, data=data, validation_data=validation_data, batch_size=batch_size, epochs=epochs, metrics=[_str2metric(metric) for metric in self.metrics], scheduler=None, # TODO num_processes=self.num_processes, model_config_keys=model_config_keys, data_config_keys=data_config_keys, optim_config_keys=optim_config_keys, loss_config_keys=loss_config_keys, **merged_config)
[docs] def tune(self, data, validation_data, target_metric='mse', direction="minimize", directions=None, n_trials=2, n_parallels=1, epochs=1, batch_size=32, acceleration=False, input_sample=None, **kwargs): """ Search the hyper parameter. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True :param validation_data: validation data, The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True :param target_metric: the target metric to optimize, a string or an instance of torchmetrics.metric.Metric, default to 'mse'. :param direction: in which direction to optimize the target metric, "maximize" - larger the better "minimize" - smaller the better default to "minimize". :param n_trials: number of trials to run :param n_parallels: number of parallel processes used to run trials. to use parallel tuning you need to use a RDB url for storage and specify study_name. For more information, refer to Nano AutoML user guide. :param epochs: the number of epochs to run in each trial fit, defaults to 1 :param batch_size: number of batch size for each trial fit, defaults to 32 :param acceleration: Whether to automatically consider the model after inference acceleration in the search process. It will only take effect if target_metric contains "latency". Default value is False. :param input_sample: A set of inputs for trace, defaults to None if you have trace before or model is a LightningModule with any dataloader attached. """ invalidInputError(not self.distributed, "HPO is not supported in distributed mode." "Please use AutoTS instead.") invalidOperationError(self.use_hpo, "HPO is disabled for this forecaster." "You may specify search space in hyper parameters to enable it.") # prepare data from bigdl.chronos.pytorch import TSTrainer as Trainer # data transformation if isinstance(data, tuple): check_transformer_data(data[0], data[1], data[2], data[3], self.data_config) if validation_data and isinstance(validation_data, tuple): check_transformer_data(validation_data[0], validation_data[1], validation_data[2], validation_data[3], self.data_config) else: invalidInputError(False, "To use tuning, you must provide validation_data" "as numpy arrays.") else: invalidInputError(False, "HPO only supports numpy train input data.") if input_sample is None: input_sample = (torch.from_numpy(data[0][:1, :, :]), torch.from_numpy(data[1][:1, :, :]), torch.from_numpy(data[2][:1, :, :]), torch.from_numpy(data[3][:1, :, :])) # prepare target metric if validation_data is not None: formated_target_metric = _format_metric_str('val', target_metric) else: invalidInputError(False, "To use tuning, you must provide validation_data" "as numpy arrays.") # build auto model self.tune_internal = self._build_automodel(data, validation_data, batch_size, epochs) self.trainer = Trainer(logger=False, max_epochs=epochs, checkpoint_callback=self.checkpoint_callback, num_processes=self.num_processes, use_ipex=self.use_ipex, use_hpo=True) # run hyper parameter search self.internal = self.trainer.search( self.tune_internal, n_trials=n_trials, target_metric=formated_target_metric, direction=direction, directions=directions, n_parallels=n_parallels, acceleration=acceleration, input_sample=input_sample, **kwargs) if self.trainer.hposearcher.objective.mo_hpo: return self.internal
# else: # reset train and validation datasets # self.trainer.reset_train_val_dataloaders(self.internal)
[docs] def search_summary(self): """ Return search summary of HPO. """ # add tuning check invalidOperationError(self.use_hpo, "No search summary when HPO is disabled.") return self.trainer.search_summary()
[docs] def fit(self, data, validation_data=None, epochs=1, batch_size=32, validation_mode='output', earlystop_patience=1, use_trial_id=None): """ Fit(Train) the forecaster. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0 and time_enc = True | 3. A bigdl.chronos.data.tsdataset.TSDataset instance :param validation_data: Validation sample for validation loop. Defaults to 'None'. If you do not input data for 'validation_data', the validation_step will be skipped. The validation_data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0 and time_enc = True | 3. A bigdl.chronos.data.tsdataset.TSDataset instance :param epochs: Number of epochs you want to train. The value defaults to 1. :param batch_size: Number of batch size you want to train. The value defaults to 32. if you input a pytorch dataloader for `data`, the batch_size will follow the batch_size setted in `data`. :param validation_mode: A str represent the operation mode while having 'validation_data'. Defaults to 'output'. The validation_mode includes the following types: | 1. output: | If you choose 'output' for validation_mode, it will return a dict that records the | average validation loss of each epoch. | | 2. earlystop: | Monitor the val_loss and stop training when it stops improving. | | 3. best_epoch: | Monitor the val_loss. And load the checkpoint of the epoch with the smallest | val_loss after the training. :param earlystop_patience: Number of checks with no improvement after which training will be stopped. It takes effect when 'validation_mode' is 'earlystop'. Under the default configuration, one check happens after every training epoch. :param use_trail_id: choose a internal according to trial_id, which is used only in multi-objective search. """ # distributed is not supported. if self.distributed: invalidInputError(False, "distributed is not support in Autoformer") # transform a tuple to dataloader. if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=True) # transform a TSDataset instance to dataloader if isinstance(data, TSDataset): _rolled = data.numpy_x is None data = data.to_torch_data_loader(batch_size=batch_size, roll=_rolled, lookback=self.data_config['past_seq_len'], horizon=self.data_config['future_seq_len'], label_len=self.data_config['label_len'], time_enc=True, feature_col=data.roll_feature, target_col=data.roll_target, shuffle=True) from bigdl.chronos.pytorch import TSTrainer as Trainer if self.use_hpo is True: # check whether the user called the tune function invalidOperationError(hasattr(self, "trainer"), "There is no trainer, and you " "should call .tune() before .fit()") # build internal according to use_trail_id for multi-objective HPO if self.trainer.hposearcher.objective.mo_hpo: invalidOperationError(self.trainer.hposearcher.study, "You must tune before fit the model.") invalidInputError(use_trial_id is not None, "For multibojective HPO, you must specify a trial id for fit.") trial = self.trainer.hposearcher.study.trials[use_trial_id] self.internal = self.tune_internal._model_build(trial) with TemporaryDirectory() as forecaster_log_dir: with TemporaryDirectory() as validation_ckpt_dir: from pytorch_lightning.loggers import CSVLogger logger = False if validation_data is None else CSVLogger( save_dir=forecaster_log_dir, flush_logs_every_n_steps=10, name="forecaster_tmp_log") from pytorch_lightning.callbacks import EarlyStopping early_stopping = EarlyStopping('val_loss', patience=earlystop_patience) from pytorch_lightning.callbacks import ModelCheckpoint checkpoint_callback = ModelCheckpoint(monitor="val_loss", dirpath=validation_ckpt_dir, filename='best', save_on_train_epoch_end=True) if validation_mode == 'earlystop': callbacks = [early_stopping] elif validation_mode == 'best_epoch': callbacks = [checkpoint_callback] else: callbacks = None # Trainer init self.trainer = Trainer(logger=logger, max_epochs=epochs, callbacks=callbacks, enable_checkpointing=self.checkpoint_callback, num_processes=self.num_processes, use_ipex=self.use_ipex, log_every_n_steps=10) # fitting if validation_data is None: self.trainer.fit(self.internal, data) self.fitted = True else: if isinstance(validation_data, tuple): validation_data = DataLoader( TensorDataset(torch.from_numpy(validation_data[0]), torch.from_numpy(validation_data[1]), torch.from_numpy(validation_data[2]), torch.from_numpy(validation_data[3])), batch_size=batch_size, shuffle=False) # transform a TSDataset instance to dataloader if isinstance(validation_data, TSDataset): _rolled = validation_data.numpy_x is None validation_data = validation_data.to_torch_data_loader( batch_size=batch_size, roll=_rolled, lookback=self.data_config['past_seq_len'], horizon=self.data_config['future_seq_len'], label_len=self.data_config['label_len'], time_enc=True, feature_col=validation_data.roll_feature, target_col=validation_data.roll_target, shuffle=False) self.trainer.fit(self.internal, data, validation_data) self.fitted = True fit_csv = os.path.join(forecaster_log_dir, "forecaster_tmp_log/version_0/metrics.csv") best_path = os.path.join(validation_ckpt_dir, "best.ckpt") fit_out = read_csv(fit_csv, loss_name='val_loss') if validation_mode == 'best_epoch': self.load(best_path) # modify logger attr in trainer, otherwise predict will report error self.trainer._logger_connector.on_trainer_init( False, self.trainer.flush_logs_every_n_steps, self.trainer.log_every_n_steps, self.trainer.move_metrics_to_cpu) return fit_out
[docs] def get_context(self, thread_num=None): """ Obtain context manager from forecaster. :param thread_num: int, the num of thread limit. The value is set to None by default where no limit is set. :return: a context manager. """ return ForecasterContextManager(self, thread_num, optimize=False)
[docs] def predict(self, data, batch_size=32): """ Predict using a trained forecaster. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0, time_enc = True and is_predict = True | 3. A bigdl.chronos.data.tsdataset.TSDataset instance :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). :return: A list of numpy ndarray """ if self.distributed: invalidInputError(False, "distributed is not support in Autoformer") # transform a TSDataset instance to dataloader if isinstance(data, TSDataset): _rolled = data.numpy_x is None data = data.to_torch_data_loader(batch_size=batch_size, roll=_rolled, lookback=self.data_config['past_seq_len'], horizon=self.data_config['future_seq_len'], label_len=self.data_config['label_len'], time_enc=True, feature_col=data.roll_feature, target_col=data.roll_target, shuffle=False) invalidInputError(isinstance(data, tuple) or isinstance(data, DataLoader), "The input data to predict() support formats: numpy ndarray tuple" f" and pytorch dataloader, but found {type(data)}.") if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=False) if not self.context_enabled: self.cxt_manager = ForecasterContextManager(self, self.thread_num, optimize=False) with self.cxt_manager: return self.trainer.predict(self.internal, data)
[docs] def evaluate(self, data, batch_size=32): """ Predict using a trained forecaster. :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0 and time_enc = True | 3. A bigdl.chronos.data.tsdataset.TSDataset instance :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). :return: A dict, currently returns the loss rather than metrics """ # TODO: use metrics here if self.distributed: invalidInputError(False, "distributed is not support in Autoformer") # transform a TSDataset instance to dataloader if isinstance(data, TSDataset): _rolled = data.numpy_x is None data = data.to_torch_data_loader(batch_size=batch_size, roll=_rolled, lookback=self.data_config['past_seq_len'], horizon=self.data_config['future_seq_len'], label_len=self.data_config['label_len'], time_enc=True, feature_col=data.roll_feature, target_col=data.roll_target, shuffle=False) invalidInputError(isinstance(data, tuple) or isinstance(data, DataLoader), "The input data to predict() support formats: numpy ndarray tuple" f" and pytorch dataloader, but found {type(data)}.") if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=False) return self.trainer.validate(self.internal, data)
[docs] def predict_interval(self, data, validation_data=None, batch_size=32, repetition_times=5): """ Calculate confidence interval of data based on Monte Carlo dropout(MC dropout). Related paper : https://arxiv.org/abs/1709.01907 :param data: The data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0, time_enc = True | 3. A bigdl.chronos.data.tsdataset.TSDataset instance :param validation_data: The validation_data support following formats: | 1. numpy ndarrays: generate from `TSDataset.roll`, be sure to set label_len > 0 and time_enc = True | 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`, be sure to set label_len > 0, time_enc = True | 3. A bigdl.chronos.data.tsdataset.TSDataset instance :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). :param repetition_times : Defines repeate how many times to calculate model uncertainty based on MC Dropout. :return: prediction and standard deviation which are both numpy array with shape (num_samples, horizon, target_dim) """ from bigdl.chronos.pytorch.utils import _pytorch_fashion_inference if self.fitted is not True: invalidInputError(False, "You must call fit or restore first before calling predict_interval!") # step1, according to validation dataset, calculate inherent noise if not hasattr(self, "data_noise"): invalidInputError(validation_data is not None, "When call predict_interval for the first time, you must pass in " "validation_data to calculate data noise.") # transform a TSDataset instance to dataloader if isinstance(validation_data, TSDataset): _rolled = validation_data.numpy_x is None validation_data = validation_data.to_torch_data_loader( batch_size=batch_size, roll=_rolled, lookback=self.data_config['past_seq_len'], horizon=self.data_config['future_seq_len'], label_len=self.data_config['label_len'], time_enc=True, feature_col=data.roll_feature, target_col=data.roll_target, shuffle=False ) # data transform if isinstance(validation_data, DataLoader): target = np.concatenate(tuple(val[1] for val in validation_data), axis=0) else: _, target, _, _ = validation_data target = target[:, -self.data_config['future_seq_len']:, :] _yhat = self.predict(validation_data) val_yhat = np.concatenate(_yhat, axis=0) self.data_noise = Evaluator.evaluate(["mse"], target, val_yhat, aggregate=None)[0] # 2d array # step2: calculate model uncertainty based MC Dropout def apply_dropout(m): if type(m) == torch.nn.Dropout: m.train() # turn on dropout self.internal.apply(apply_dropout) # transform a TSDataset instance to dataloader if isinstance(data, TSDataset): _rolled = data.numpy_x is None data = data.to_torch_data_loader(batch_size=batch_size, roll=_rolled, lookback=self.data_config['past_seq_len'], horizon=self.data_config['future_seq_len'], label_len=self.data_config['label_len'], time_enc=True, feature_col=data.roll_feature, target_col=data.roll_target, shuffle=False) def predict(data, model): # manually implement predict to avoid .eval() in trainer.predict() if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1]), torch.from_numpy(data[2]), torch.from_numpy(data[3]),), batch_size=batch_size, shuffle=False) outputs_list = [] for batch in data: batch_x, batch_y, batch_x_mark, batch_y_mark = map(lambda x: x.float(), batch) outputs = model(batch_x, batch_x_mark, batch_y, batch_y_mark) outputs = outputs[:, -model.pred_len:, -model.c_out:] outputs_list.append(outputs.detach().numpy()) return outputs_list y_hat_list = [] for i in range(repetition_times): _yhat = predict(data, self.internal) yhat = np.concatenate(_yhat, axis=0) y_hat_list.append(yhat) y_hat_mean = np.mean(np.stack(y_hat_list, axis=0), axis=0) model_bias = np.zeros_like(y_hat_mean) # 3d array for i in range(repetition_times): model_bias += (y_hat_list[i] - y_hat_mean)**2 model_bias /= repetition_times std_deviation = np.sqrt(self.data_noise + model_bias) return y_hat_mean, std_deviation
[docs] def get_model(self): """ Returns the learned PyTorch Lightning model. :return: a pytorch lightning model instance """ return self.internal
[docs] def load(self, checkpoint_file): """ restore the forecaster. :param checkpoint_file: The checkpoint file location you want to load the forecaster. """ self.trainer = Trainer(logger=False, max_epochs=1, checkpoint_callback=self.checkpoint_callback, num_processes=1, use_ipex=self.use_ipex, distributed_backend="spawn") checkpoint = torch.load(checkpoint_file) config = checkpoint["hyper_parameters"] args = _transform_config_to_namedtuple(config) internal = AutoFormer.load_from_checkpoint(checkpoint_file, configs=args) self.internal = internal
[docs] def save(self, checkpoint_file): """ save the forecaster. :param checkpoint_file: The checkpoint file location you want to load the forecaster. """ if self.use_hpo: self.trainer.model = self.trainer.model.model self.trainer.save_checkpoint(checkpoint_file)
[docs] @classmethod def from_tsdataset(cls, tsdataset, past_seq_len=None, future_seq_len=None, label_len=None, freq=None, **kwargs): """ Build a Forecaster Model. :param tsdataset: A bigdl.chronos.data.tsdataset.TSDataset instance. :param past_seq_len: int or "auto", Specify the history time steps (i.e. lookback). Do not specify the 'past_seq_len' if your tsdataset has called the 'TSDataset.roll' method or 'TSDataset.to_torch_data_loader'. If "auto", the mode of time series' cycle length will be taken as the past_seq_len. :param future_seq_len: int or list, Specify the output time steps (i.e. horizon). Do not specify the 'future_seq_len' if your tsdataset has called the 'TSDataset.roll' method or 'TSDataset.to_torch_data_loader'. :param kwargs: Specify parameters of Forecaster, e.g. loss and optimizer, etc. More info, please refer to Forecaster.__init__ methods. :return: A Forecaster Model. """ from bigdl.nano.utils.common import invalidInputError invalidInputError(isinstance(tsdataset, TSDataset), f"We only supports input a TSDataset, but get{type(tsdataset)}.") def check_time_steps(tsdataset, past_seq_len, future_seq_len): if tsdataset.lookback is not None and past_seq_len is not None: future_seq_len = future_seq_len if isinstance(future_seq_len, int)\ else max(future_seq_len) return tsdataset.lookback == past_seq_len and tsdataset.horizon == future_seq_len return True invalidInputError(not tsdataset._has_generate_agg_feature, "We will add support for 'gen_rolling_feature' method later.") if tsdataset.lookback is not None: # called roll or to_torch_data_loader past_seq_len = tsdataset.lookback future_seq_len = tsdataset.horizon if isinstance(tsdataset.horizon, int) \ else max(tsdataset.horizon) output_feature_num = len(tsdataset.roll_target) input_feature_num = len(tsdataset.roll_feature) + output_feature_num elif past_seq_len is not None and future_seq_len is not None: # initialize only past_seq_len = past_seq_len if isinstance(past_seq_len, int)\ else tsdataset.get_cycle_length() future_seq_len = future_seq_len if isinstance(future_seq_len, int) \ else max(future_seq_len) output_feature_num = len(tsdataset.target_col) input_feature_num = len(tsdataset.feature_col) + output_feature_num else: invalidInputError(False, "Forecaster requires 'past_seq_len' and 'future_seq_len' to specify " "the history time step and output time step.") if label_len is None: label_len = max(past_seq_len//2, 1) invalidInputError(tsdataset.label_len == label_len or tsdataset.label_len is None, f"Expected label_len to be {tsdataset.label_len}, " f"but found {label_len}") invalidInputError(check_time_steps(tsdataset, past_seq_len, future_seq_len), "tsdataset already has history time steps and " "differs from the given past_seq_len and future_seq_len " "Expected past_seq_len and future_seq_len to be " f"{tsdataset.lookback, tsdataset.horizon}, " f"but found {past_seq_len, future_seq_len}.", fixMsg="Do not specify past_seq_len and future seq_len " "or call tsdataset.roll method again and specify time step") if tsdataset._freq is not None: infer_freq_str = _timedelta_to_delta_str(tsdataset._freq) freq = infer_freq_str return cls(past_seq_len=past_seq_len, future_seq_len=future_seq_len, input_feature_num=input_feature_num, output_feature_num=output_feature_num, freq=freq, label_len=label_len, **kwargs)
def _str2metric(metric): # map metric str to function if isinstance(metric, str): metric_name = metric from bigdl.chronos.metric.forecast_metrics import REGRESSION_MAP metric_func = REGRESSION_MAP[metric_name] def metric(y_label, y_predict): y_label = y_label.numpy() y_predict = y_predict.numpy() return metric_func(y_label, y_predict) metric.__name__ = metric_name return metric def _timedelta_to_delta_str(offset): features_by_offsets = ( (Timedelta(seconds=60), 's'), (Timedelta(minutes=60), 't'), (Timedelta(hours=24), 'h'), (Timedelta(days=7), 'd'), (Timedelta(days=30), 'w'), (Timedelta(days=365), 'm'), ) for offset_type, offset_str in features_by_offsets: if offset < offset_type: return offset_str return 'a'