Source code for bigdl.chronos.forecaster.base_forecaster

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from bigdl.chronos.forecaster.abstract import Forecaster
from bigdl.chronos.forecaster.utils import\
    np_to_creator, set_pytorch_seed, check_data, xshard_to_np, np_to_xshard, loader_to_creator
from bigdl.chronos.metric.forecast_metrics import Evaluator

import numpy as np
import warnings
import torch
from functools import partial
from torch.utils.data import TensorDataset, DataLoader


[docs]class BasePytorchForecaster(Forecaster): ''' Forecaster base model for lstm, seq2seq and tcn forecasters. ''' def __init__(self, **kwargs): if self.distributed: from bigdl.orca.learn.pytorch.estimator import Estimator from bigdl.orca.learn.metrics import MSE, MAE ORCA_METRICS = {"mse": MSE, "mae": MAE} def model_creator_orca(config): set_pytorch_seed(self.seed) model = self.model_creator({**self.model_config, **self.data_config}) model.train() return model self.internal = Estimator.from_torch(model=model_creator_orca, optimizer=self.optimizer_creator, loss=self.loss_creator, metrics=[ORCA_METRICS[name]() for name in self.metrics], backend=self.distributed_backend, use_tqdm=True, config={"lr": self.lr}, workers_per_node=self.workers_per_node) else: # seed setting from pytorch_lightning import seed_everything from bigdl.nano.pytorch.trainer import Trainer seed_everything(seed=self.seed) # Model preparation self.fitted = False model = self.model_creator({**self.model_config, **self.data_config}) loss = self.loss_creator(self.loss_config) optimizer = self.optimizer_creator(model, self.optim_config) self.internal = Trainer.compile(model=model, loss=loss, optimizer=optimizer, onnx=self.onnx_available)
[docs] def fit(self, data, epochs=1, batch_size=32): # TODO: give an option to close validation during fit to save time. """ Fit(Train) the forecaster. :param data: The data support following formats: | 1. a numpy ndarray tuple (x, y): | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. | y's shape is (num_samples, horizon, target_dim), where horizon and target_dim | should be the same as future_seq_len and output_feature_num. | | 2. a xshard item: | each partition can be a dictionary of {'x': x, 'y': y}, where x and y's shape | should follow the shape stated before. | | 3. pytorch dataloader: | the dataloader should return x, y in each iteration with the shape as following: | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. | y's shape is (num_samples, horizon, target_dim), where horizon and target_dim | should be the same as future_seq_len and output_feature_num. :param epochs: Number of epochs you want to train. The value defaults to 1. :param batch_size: Number of batch size you want to train. The value defaults to 32. if you input a pytorch dataloader for `data`, the batch_size will follow the batch_size setted in `data`.if the forecaster is distributed, the batch_size will be evenly distributed to all workers. :return: Evaluation results on data. """ # input transform if isinstance(data, DataLoader) and self.distributed: data = loader_to_creator(data) if isinstance(data, tuple) and self.distributed: data = np_to_creator(data) try: from bigdl.orca.data.shard import SparkXShards if isinstance(data, SparkXShards) and not self.distributed: warnings.warn("Xshards is collected to local since the " "forecaster is non-distribued.") data = xshard_to_np(data) except ImportError: pass # fit on internal if self.distributed: # for cluster mode from bigdl.orca.common import OrcaContext sc = OrcaContext.get_spark_context().getConf() num_nodes = 1 if sc.get('spark.master').startswith('local') \ else int(sc.get('spark.executor.instances')) if batch_size % self.workers_per_node != 0: raise RuntimeError("Please make sure that batch_size can be divisible by " "the product of worker_per_node and num_nodes, " f"but 'batch_size' is {batch_size}, 'workers_per_node' " f"is {self.workers_per_node}, 'num_nodes' is {num_nodes}") batch_size //= (self.workers_per_node * num_nodes) return self.internal.fit(data=data, epochs=epochs, batch_size=batch_size) else: from bigdl.nano.pytorch.trainer import Trainer # numpy data shape checking if isinstance(data, tuple): check_data(data[0], data[1], self.data_config) else: warnings.warn("Data shape checking is not supported by dataloader input.") # data transformation if isinstance(data, tuple): data = DataLoader(TensorDataset(torch.from_numpy(data[0]), torch.from_numpy(data[1])), batch_size=batch_size, shuffle=True) # Trainer init and fitting self.trainer = Trainer(logger=False, max_epochs=epochs, checkpoint_callback=self.checkpoint_callback, num_processes=self.num_processes, use_ipex=self.use_ipex) self.trainer.fit(self.internal, data) self.fitted = True
[docs] def predict(self, data, batch_size=32, quantize=False): """ Predict using a trained forecaster. if you want to predict on a single node(which is common practice), please call .to_local().predict(x, ...) :param data: The data support following formats: | 1. a numpy ndarray x: | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. | 2. a xshard item: | each partition can be a dictionary of {'x': x}, where x's shape | should follow the shape stated before. :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). :param quantize: if use the quantized model to predict. :return: A numpy array with shape (num_samples, horizon, target_dim) if data is a numpy ndarray. A xshard item with format {‘prediction’: result}, where result is a numpy array with shape (num_samples, horizon, target_dim) if data is a xshard item. """ # data transform is_local_data = isinstance(data, np.ndarray) if is_local_data and self.distributed: data = np_to_xshard(data) if not is_local_data and not self.distributed: data = xshard_to_np(data, mode="predict") if self.distributed: yhat = self.internal.predict(data, batch_size=batch_size) if is_local_data: expand_dim = [] if self.data_config["future_seq_len"] == 1: expand_dim.append(1) if self.data_config["output_feature_num"] == 1: expand_dim.append(2) yhat = xshard_to_np(yhat, mode="yhat", expand_dim=expand_dim) return yhat else: if not self.fitted: raise RuntimeError("You must call fit or restore first before calling predict!") yhat = self.internal.inference(torch.from_numpy(data), backend=None, quantize=quantize).numpy() if not is_local_data: yhat = np_to_xshard(yhat, prefix="prediction") return yhat
[docs] def predict_with_onnx(self, data, batch_size=32): """ Predict using a trained forecaster with onnxruntime. The method can only be used when forecaster is a non-distributed version. Directly call this method without calling build_onnx is valid and Forecaster will automatically build an onnxruntime session with default settings. :param data: The data support following formats: | 1. a numpy ndarray x: | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. :param batch_size: predict batch size. The value will not affect predict result but will affect resources cost(e.g. memory and time). Defaults to 32. None for all-data-single-time inference. :return: A numpy array with shape (num_samples, horizon, target_dim). """ if self.distributed: raise NotImplementedError("ONNX inference has not been supported for distributed " "forecaster. You can call .to_local() to transform the " "forecaster to a non-distributed version.") if not self.fitted: raise RuntimeError("You must call fit or restore first before calling predict!") return self.internal.inference(data, batch_size=batch_size)
[docs] def evaluate(self, data, batch_size=32, multioutput="raw_values", quantize=False): """ Evaluate using a trained forecaster. Please note that evaluate result is calculated by scaled y and yhat. If you scaled your data (e.g. use .scale() on the TSDataset) please follow the following code snap to evaluate your result if you need to evaluate on unscaled data. if you want to evaluate on a single node(which is common practice), please call .to_local().evaluate(data, ...) >>> from bigdl.orca.automl.metrics import Evaluator >>> y_hat = forecaster.predict(x) >>> y_hat_unscaled = tsdata.unscale_numpy(y_hat) # or other customized unscale methods >>> y_unscaled = tsdata.unscale_numpy(y) # or other customized unscale methods >>> Evaluator.evaluate(metric=..., y_unscaled, y_hat_unscaled, multioutput=...) :param data: The data support following formats: | 1. a numpy ndarray tuple (x, y): | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. | y's shape is (num_samples, horizon, target_dim), where horizon and target_dim | should be the same as future_seq_len and output_feature_num. | 2. a xshard item: | each partition can be a dictionary of {'x': x, 'y': y}, where x and y's shape | should follow the shape stated before. :param batch_size: evaluate batch size. The value will not affect evaluate result but will affect resources cost(e.g. memory and time). :param multioutput: Defines aggregating of multiple output values. String in ['raw_values', 'uniform_average']. The value defaults to 'raw_values'.The param is only effective when the forecaster is a non-distribtued version. :param quantize: if use the quantized model to predict. :return: A list of evaluation results. Each item represents a metric. """ # data transform is_local_data = isinstance(data, tuple) if not is_local_data and not self.distributed: data = xshard_to_np(data, mode="fit") if self.distributed: if is_local_data: return self.internal.evaluate(data=np_to_creator(data), batch_size=batch_size) else: return self.internal.evaluate(data=data, batch_size=batch_size) else: if not self.fitted: raise RuntimeError("You must call fit or restore first before calling evaluate!") yhat_torch = self.internal.inference(torch.from_numpy(data[0]), backend=None, quantize=quantize) aggregate = 'mean' if multioutput == 'uniform_average' else None return Evaluator.evaluate(self.metrics, data[1], yhat_torch.numpy(), aggregate=aggregate)
[docs] def evaluate_with_onnx(self, data, batch_size=32, multioutput="raw_values"): """ Evaluate using a trained forecaster with onnxruntime. The method can only be used when forecaster is a non-distributed version. Directly call this method without calling build_onnx is valid and Forecaster will automatically build an onnxruntime session with default settings. Please note that evaluate result is calculated by scaled y and yhat. If you scaled your data (e.g. use .scale() on the TSDataset) please follow the following code snap to evaluate your result if you need to evaluate on unscaled data. >>> from bigdl.orca.automl.metrics import Evaluator >>> y_hat = forecaster.predict(x) >>> y_hat_unscaled = tsdata.unscale_numpy(y_hat) # or other customized unscale methods >>> y_unscaled = tsdata.unscale_numpy(y) # or other customized unscale methods >>> Evaluator.evaluate(metric=..., y_unscaled, y_hat_unscaled, multioutput=...) :param data: The data support following formats: | 1. a numpy ndarray tuple (x, y): | x's shape is (num_samples, lookback, feature_dim) where lookback and feature_dim | should be the same as past_seq_len and input_feature_num. | y's shape is (num_samples, horizon, target_dim), where horizon and target_dim | should be the same as future_seq_len and output_feature_num. :param batch_size: evaluate batch size. The value will not affect evaluate result but will affect resources cost(e.g. memory and time). :param multioutput: Defines aggregating of multiple output values. String in ['raw_values', 'uniform_average']. The value defaults to 'raw_values'. :return: A list of evaluation results. Each item represents a metric. """ if self.distributed: raise NotImplementedError("ONNX inference has not been supported for distributed " "forecaster. You can call .to_local() to transform the " "forecaster to a non-distributed version.") if not self.fitted: raise RuntimeError("You must call fit or restore first before calling evaluate!") yhat = self.internal.inference(data[0], batch_size=batch_size) aggregate = 'mean' if multioutput == 'uniform_average' else None return Evaluator.evaluate(self.metrics, data[1], yhat, aggregate=aggregate)
[docs] def save(self, checkpoint_file, quantize_checkpoint_file=None): """ Save the forecaster. Please note that if you only want the pytorch model or onnx model file, you can call .get_model() or .export_onnx_file(). The checkpoint file generated by .save() method can only be used by .load(). :param checkpoint_file: The location you want to save the forecaster. :param quantize_checkpoint_file: The location you want to save quantized forecaster. """ if self.distributed: self.internal.save(checkpoint_file) else: if not self.fitted: raise RuntimeError("You must call fit or restore first before calling save!") self.trainer.save_checkpoint(checkpoint_file) # save current status if quantize_checkpoint_file: try: torch.save(self.internal.quantized_state_dict(), quantize_checkpoint_file) except RuntimeError: warnings.warn("Please call .quantize() method to build " "an up-to-date quantized model")
[docs] def load(self, checkpoint_file, quantize_checkpoint_file=None): """ restore the forecaster. :param checkpoint_file: The checkpoint file location you want to load the forecaster. :param quantize_checkpoint_file: The checkpoint file location you want to load the quantized forecaster. """ if self.distributed: self.internal.load(checkpoint_file) else: from bigdl.nano.pytorch.lightning import LightningModuleFromTorch from bigdl.nano.pytorch.trainer import Trainer model = self.model_creator({**self.model_config, **self.data_config}) loss = self.loss_creator(self.loss_config) optimizer = self.optimizer_creator(model, self.optim_config) self.internal = LightningModuleFromTorch.load_from_checkpoint(checkpoint_file, model=model, loss=loss, optimizer=optimizer) self.internal = Trainer.compile(self.internal, onnx=self.onnx_available) self.fitted = True if quantize_checkpoint_file: self.internal.load_quantized_state_dict(torch.load(quantize_checkpoint_file))
[docs] def to_local(self): """ Transform a distributed forecaster to a local (non-distributed) one. Common practice is to use distributed training (fit) and predict/ evaluate with onnx or other frameworks on a single node. To do so, you need to call .to_local() and transform the forecaster to a non- distributed one. The optimizer is refreshed, incremental training after to_local might have some problem. :return: a forecaster instance. """ from bigdl.nano.pytorch.trainer import Trainer # TODO: optimizer is refreshed, which is not reasonable if not self.distributed: raise RuntimeError("The forecaster has become local.") model = self.internal.get_model() self.internal.shutdown() loss = self.loss_creator(self.loss_config) optimizer = self.optimizer_creator(model, self.optim_config) self.internal = Trainer.compile(model=model, loss=loss, optimizer=optimizer, onnx=self.onnx_available) self.distributed = False self.fitted = True return self
[docs] def get_model(self): """ Returns the learned PyTorch model. :return: a pytorch model instance """ if self.distributed: return self.internal.get_model() else: return self.internal.model
[docs] def build_onnx(self, thread_num=None, sess_options=None): ''' Build onnx model to speed up inference and reduce latency. The method is Not required to call before predict_with_onnx, evaluate_with_onnx or export_onnx_file. It is recommended to use when you want to: | 1. Strictly control the thread to be used during inferencing. | 2. Alleviate the cold start problem when you call predict_with_onnx for the first time. :param thread_num: int, the num of thread limit. The value is set to None by default where no limit is set. :param sess_options: an onnxruntime.SessionOptions instance, if you set this other than None, a new onnxruntime session will be built on this setting and ignore other settings you assigned(e.g. thread_num...). Example: >>> # to pre build onnx sess >>> forecaster.build_onnx(thread_num=1) # build onnx runtime sess for single thread >>> pred = forecaster.predict_with_onnx(data) >>> # ------------------------------------------------------ >>> # directly call onnx related method is also supported >>> pred = forecaster.predict_with_onnx(data) ''' import onnxruntime if sess_options is not None and not isinstance(sess_options, onnxruntime.SessionOptions): raise RuntimeError("sess_options should be an onnxruntime.SessionOptions instance" f", but found {type(sess_options)}") if sess_options is None: sess_options = onnxruntime.SessionOptions() if thread_num is not None: sess_options.intra_op_num_threads = thread_num if self.distributed: raise NotImplementedError("build_onnx has not been supported for distributed " "forecaster. You can call .to_local() to transform the " "forecaster to a non-distributed version.") dummy_input = torch.rand(1, self.data_config["past_seq_len"], self.data_config["input_feature_num"]) self.internal.update_ortsess(dummy_input, sess_options=sess_options)
[docs] def export_onnx_file(self, dirname="model.onnx"): """ Save the onnx model file to the disk. :param dirname: The dir location you want to save the onnx file. """ if self.distributed: raise NotImplementedError("export_onnx_file has not been supported for distributed " "forecaster. You can call .to_local() to transform the " "forecaster to a non-distributed version.") dummy_input = torch.rand(1, self.data_config["past_seq_len"], self.data_config["input_feature_num"]) self.internal.update_ortsess(dummy_input, dirname=dirname)
[docs] def quantize(self, calib_data, val_data=None, metric=None, conf=None, framework='pytorch_fx', approach='static', tuning_strategy='bayesian', relative_drop=None, absolute_drop=None, timeout=0, max_trials=1): """ Quantize the forecaster. :param calib_data: A torch.utils.data.dataloader.DataLoader object for calibration. Required for static quantization. :param val_data: A torch.utils.data.dataloader.DataLoader object for evaluation. :param metric: A str represent the metrics for tunning the quality of quantization. You may choose from "mse", "mae", "rmse", "r2", "mape", "smape". :param conf: A path to conf yaml file for quantization. Default to None, using default config. :param framework: 'pytorch' or 'pytorch_fx', Default to 'pytorch_fx'. Consistent with Intel Neural Compressor. :param approach: str, 'static' or 'dynamic'. Default to 'static'. :param tuning_strategy: str, 'bayesian', 'basic', 'mse' or 'sigopt'. Default to 'bayesian'. :param relative_drop: Float, tolerable ralative accuracy drop. Default to None, e.g. set to 0.1 means that we accept a 10% increase in the metrics error. :param absolute_drop: Float, tolerable ralative accuracy drop. Default to None, e.g. set to 5 means that we can only accept metrics smaller than 5. :param timeout: Tuning timeout (seconds). Default to 0, which means early stop. Combine with max_trials field to decide when to exit. :param max_trials: Max tune times. Default to 1. Combine with timeout field to decide when to exit. "timeout=0, max_trials=1" means it will try quantization only once and return satisfying best model. """ # check model support for quantization if not self.quantize_available: raise NotImplementedError("This model has not supported quantization.") # Distributed forecaster does not support quantization if self.distributed: raise NotImplementedError("quantization has not been supported for distributed " "forecaster. You can call .to_local() to transform the " "forecaster to a non-distributed version.") # calib data should be set if the forecaster is just loaded assert calib_data is not None, "You must set a `calib_data` for quantization." # change data tuple to dataloader if isinstance(calib_data, tuple): calib_data = DataLoader(TensorDataset(torch.from_numpy(calib_data[0]), torch.from_numpy(calib_data[1]))) if isinstance(val_data, tuple): val_data = DataLoader(TensorDataset(torch.from_numpy(val_data[0]), torch.from_numpy(val_data[1]))) # map metric str to function from bigdl.chronos.metric.forecast_metrics import TORCHMETRICS_REGRESSION_MAP if isinstance(metric, str): metric = TORCHMETRICS_REGRESSION_MAP[metric] # init acc criterion accuracy_criterion = None if relative_drop and absolute_drop: raise ValueError("Please unset either `relative_drop` or `absolute_drop`.") if relative_drop: accuracy_criterion = {'relative': relative_drop, 'higher_is_better': False} if absolute_drop: accuracy_criterion = {'absolute': absolute_drop, 'higher_is_better': False} # quantize self.internal = self.trainer.quantize(self.internal, calib_dataloader=calib_data, val_dataloader=val_data, metric=metric, conf=conf, framework=framework, approach=approach, tuning_strategy=tuning_strategy, accuracy_criterion=accuracy_criterion, timeout=timeout, max_trials=max_trials, raw_return=False)