Source code for bigdl.chronos.data.tsdataset

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pandas as pd
import numpy as np
import functools
import logging

from bigdl.chronos.data.utils.feature import generate_dt_features, generate_global_features
from bigdl.chronos.data.utils.impute import impute_timeseries_dataframe
from bigdl.chronos.data.utils.deduplicate import deduplicate_timeseries_dataframe
from bigdl.chronos.data.utils.roll import roll_timeseries_dataframe
from bigdl.chronos.data.utils.time_feature import time_features, gen_time_enc_arr
from bigdl.chronos.data.utils.scale import unscale_timeseries_numpy, scale_timeseries_numpy
from bigdl.chronos.data.utils.resample import resample_timeseries_dataframe
from bigdl.chronos.data.utils.split import split_timeseries_dataframe
from bigdl.chronos.data.utils.cycle_detection import cycle_length_est
from bigdl.chronos.data.utils.quality_inspection import quality_check_timeseries_dataframe,\
    _abnormal_value_repair
from bigdl.chronos.data.utils.utils import _to_list, _check_type,\
    _check_col_within, _check_col_no_na, _check_is_aligned, _check_dt_is_sorted


_DEFAULT_ID_COL_NAME = "id"
_DEFAULT_ID_PLACEHOLDER = "0"


[docs]class TSDataset: def __init__(self, data, repair=False, **schema): ''' TSDataset is an abstract of time series dataset. Cascade call is supported for most of the transform methods. ''' self.df = data # whether to use deploy mode to improve latency in production environment self.deploy_mode = schema["deploy_mode"] if not self.deploy_mode: # detect low-quality data and automatic repair (optional) _, self.df = quality_check_timeseries_dataframe(df=self.df, dt_col=schema["dt_col"], id_col=schema["id_col"], repair=repair) self.id_col = schema["id_col"] self.dt_col = schema["dt_col"] self.feature_col = schema["feature_col"].copy() self.target_col = schema["target_col"].copy() self.numpy_x = None self.numpy_y = None self.lookback = None # lookback stated by users if they called roll, to_torch_data_loader self.horizon = None # horizon stated by users if they called roll, to_torch_data_loader self.label_len = None self.roll_feature = None # contains feature_col requested by roll/to_torch_data_loader self.roll_target = None # contains target_col requested by roll/to_torch_data_loader self.roll_feature_df = None self.roll_additional_feature = None self.scaler = None self.scaler_index = [i for i in range(len(self.target_col))] self.id_sensitive = None self._has_generate_agg_feature = False if not self.deploy_mode: self._check_basic_invariants() self._id_list = list(np.unique(self.df[self.id_col])) self._freq_certainty = False self._freq = None self._is_pd_datetime = pd.api.types.is_datetime64_any_dtype(self.df[self.dt_col].dtypes) if self._is_pd_datetime: if len(self.df[self.dt_col]) < 2: self._freq = None else: self._freq = self.df[self.dt_col].iloc[1] - self.df[self.dt_col].iloc[0]
[docs] @staticmethod def from_pandas(df, dt_col, target_col, id_col=None, extra_feature_col=None, with_split=False, val_ratio=0, test_ratio=0.1, repair=False, deploy_mode=False): ''' Initialize tsdataset(s) from pandas dataframe. :param df: a pandas dataframe for your raw time series data. :param dt_col: a str indicates the col name of datetime column in the input data frame, the dt_col must be sorted from past to latest respectively for each id. :param target_col: a str or list indicates the col name of target column in the input data frame. :param id_col: (optional) a str indicates the col name of dataframe id. If it is not explicitly stated, then the data is interpreted as only containing a single id. :param extra_feature_col: (optional) a str or list indicates the col name of extra feature columns that needs to predict the target column. :param with_split: (optional) bool, states if we need to split the dataframe to train, validation and test set. The value defaults to False. :param val_ratio: (optional) float, validation ratio. Only effective when with_split is set to True. The value defaults to 0. :param test_ratio: (optional) float, test ratio. Only effective when with_split is set to True. The value defaults to 0.1. :param repair: a bool indicates whether automaticly repair low quality data, which may call .impute()/.resample() or modify datetime column on dataframe. The value defaults to False. :param deploy_mode: a bool indicates whether to use deploy mode, which will be used in production environment to reduce the latency of data processing. The value defaults to False. :return: a TSDataset instance when with_split is set to False, three TSDataset instances when with_split is set to True. Create a tsdataset instance by: >>> # Here is a df example: >>> # id datetime value "extra feature 1" "extra feature 2" >>> # 00 2019-01-01 1.9 1 2 >>> # 01 2019-01-01 2.3 0 9 >>> # 00 2019-01-02 2.4 3 4 >>> # 01 2019-01-02 2.6 0 2 >>> tsdataset = TSDataset.from_pandas(df, dt_col="datetime", >>> target_col="value", id_col="id", >>> extra_feature_col=["extra feature 1", >>> "extra feature 2"]) ''' if not deploy_mode: _check_type(df, "df", pd.DataFrame) tsdataset_df = df.copy(deep=True) else: tsdataset_df = df target_col = _to_list(target_col, name="target_col", deploy_mode=deploy_mode) feature_col = _to_list(extra_feature_col, name="extra_feature_col", deploy_mode=deploy_mode) if id_col is None: tsdataset_df[_DEFAULT_ID_COL_NAME] = _DEFAULT_ID_PLACEHOLDER id_col = _DEFAULT_ID_COL_NAME if with_split: tsdataset_dfs = split_timeseries_dataframe(df=tsdataset_df, id_col=id_col, val_ratio=val_ratio, test_ratio=test_ratio) return [TSDataset(data=tsdataset_dfs[i], repair=repair, id_col=id_col, dt_col=dt_col, target_col=target_col, feature_col=feature_col, deploy_mode=deploy_mode) for i in range(3)] return TSDataset(data=tsdataset_df, repair=repair, id_col=id_col, dt_col=dt_col, target_col=target_col, feature_col=feature_col, deploy_mode=deploy_mode)
[docs] @staticmethod def from_parquet(path, dt_col, target_col, id_col=None, extra_feature_col=None, with_split=False, val_ratio=0, test_ratio=0.1, repair=False, deploy_mode=False, **kwargs): """ Initialize tsdataset(s) from path of parquet file. :param path: A string path to parquet file. The string could be a URL. Valid URL schemes include hdfs, http, ftp, s3, gs, and file. For file URLs, a host is expected. A local file could be: file://localhost/path/to/table.parquet. A file URL can also be a path to a directory that contains multiple partitioned parquet files. :param dt_col: a str indicates the col name of datetime column in the input data frame. :param target_col: a str or list indicates the col name of target column in the input data frame. :param id_col: (optional) a str indicates the col name of dataframe id. If it is not explicitly stated, then the data is interpreted as only containing a single id. :param extra_feature_col: (optional) a str or list indicates the col name of extra feature columns that needs to predict the target column. :param with_split: (optional) bool, states if we need to split the dataframe to train, validation and test set. The value defaults to False. :param val_ratio: (optional) float, validation ratio. Only effective when with_split is set to True. The value defaults to 0. :param test_ratio: (optional) float, test ratio. Only effective when with_split is set to True. The value defaults to 0.1. :param repair: a bool indicates whether automaticly repair low quality data, which may call .impute()/.resample() or modify datetime column on dataframe. The value defaults to False. :param deploy_mode: a bool indicates whether to use deploy mode, which will be used in production environment to reduce the latency of data processing. The value defaults to False. :param kwargs: Any additional kwargs are passed to the pd.read_parquet and pyarrow.parquet.read_table. :return: a TSDataset instance when with_split is set to False, three TSDataset instances when with_split is set to True. Create a tsdataset instance by: >>> # Here is a df example: >>> # id datetime value "extra feature 1" "extra feature 2" >>> # 00 2019-01-01 1.9 1 2 >>> # 01 2019-01-01 2.3 0 9 >>> # 00 2019-01-02 2.4 3 4 >>> # 01 2019-01-02 2.6 0 2 >>> tsdataset = TSDataset.from_parquet("hdfs://path/to/table.parquet", dt_col="datetime", >>> target_col="value", id_col="id", >>> extra_feature_col=["extra feature 1", >>> "extra feature 2"]) """ from bigdl.chronos.data.utils.file import parquet2pd columns = _to_list(dt_col, name="dt_col", deploy_mode=deploy_mode) + \ _to_list(target_col, name="target_col", deploy_mode=deploy_mode) + \ _to_list(id_col, name="id_col", deploy_mode=deploy_mode) + \ _to_list(extra_feature_col, name="extra_feature_col", deploy_mode=deploy_mode) df = parquet2pd(path, columns=columns, **kwargs) return TSDataset.from_pandas(df, repair=repair, dt_col=dt_col, target_col=target_col, id_col=id_col, extra_feature_col=extra_feature_col, with_split=with_split, val_ratio=val_ratio, test_ratio=test_ratio, deploy_mode=deploy_mode)
[docs] @staticmethod def from_prometheus(prometheus_url, query, starttime, endtime, step, target_col=None, id_col=None, extra_feature_col=None, with_split=False, val_ratio=0, test_ratio=0.1, repair=False, deploy_mode=False, **kwargs): """ Initialize tsdataset(s) from Prometheus data for specified time period via url. :param prometheus_url: a str indicates url of a Prometheus server. :param query: a Prometheus expression query str or list. :param starttime: start timestamp of the specified time period, RFC-3339 string or as a Unix timestamp in seconds. :param endtime: end timestamp of the specified time period, RFC-3339 string or as a Unix timestamp in seconds. :param step: a str indicates query resolution step width in Prometheus duration format or float number of seconds. More information about Prometheus time durations are here: https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations :param target_col: (optional) a Prometheus expression query str or list indicates the col name of target column in the input data frame. If it is not explicitly stated, then target column is automatically specified according to the Prometheus data. :param id_col: (optional) a Prometheus expression query str indicates the col name of dataframe id. If it is not explicitly stated, then the data is interpreted as only containing a single id. :param extra_feature_col: (optional) a Prometheus expression query str or list indicates the col name of extra feature columns that needs to predict the target column. If it is not explicitly stated, then extra feature column is None. :param with_split: (optional) bool, states if we need to split the dataframe to train, validation and test set. The value defaults to False. :param val_ratio: (optional) float, validation ratio. Only effective when with_split is set to True. The value defaults to 0. :param test_ratio: (optional) float, test ratio. Only effective when with_split is set to True. The value defaults to 0.1. :param repair: a bool indicates whether automaticly repair low quality data, which may call .impute()/.resample() or modify datetime column on dataframe. The value defaults to False. :param deploy_mode: a bool indicates whether to use deploy mode, which will be used in production environment to reduce the latency of data processing. The value defaults to False. :param kwargs: Any additional kwargs are passed to the Prometheus query, such as timeout. :return: a TSDataset instance when with_split is set to False, three TSDataset instances when with_split is set to True. Create a tsdataset instance by: >>> # Here is an example: >>> tsdataset = TSDataset.from_prometheus(prometheus_url="http://localhost:9090", >>> query="collectd_cpufreq{cpufreq="0"}", >>> starttime="2022-09-01T00:00:00Z", >>> endtime="2022-10-01T00:00:00Z", >>> step="1h") """ # TODO: Corresponding unit test should be added # Only test locally at present from bigdl.chronos.data.utils.prometheus_df import GetRangeDataframe query_list = _to_list(query, name="query", deploy_mode=deploy_mode) columns = {"target_col": _to_list(target_col, name="target_col", deploy_mode=deploy_mode), "id_col": _to_list(id_col, name="id_col", deploy_mode=deploy_mode), "extra_feature_col": _to_list(extra_feature_col, name="extra_feature_col", deploy_mode=deploy_mode)} df, df_columns = GetRangeDataframe(prometheus_url, query_list, starttime, endtime, step, columns=columns, **kwargs) return TSDataset.from_pandas(df, dt_col=df_columns["dt_col"], target_col=df_columns["target_col"], id_col=df_columns["id_col"], extra_feature_col=df_columns["extra_feature_col"], with_split=with_split, val_ratio=val_ratio, test_ratio=test_ratio, repair=repair, deploy_mode=deploy_mode)
[docs] def impute(self, mode="last", const_num=0): ''' Impute the tsdataset by imputing each univariate time series distinguished by id_col and feature_col. :param mode: imputation mode, select from "last", "const" or "linear". "last": impute by propagating the last non N/A number to its following N/A. if there is no non N/A number ahead, 0 is filled instead. "const": impute by a const value input by user. "linear": impute by linear interpolation. :param const_num: indicates the const number to fill, which is only effective when mode is set to "const". :return: the tsdataset instance. ''' result = [] groups = self.df.groupby([self.id_col]) for _, group in groups: result.append(impute_timeseries_dataframe(df=group, dt_col=self.dt_col, mode=mode, const_num=const_num)) self.df = pd.concat(result, axis=0) self.df.reset_index(drop=True, inplace=True) return self
[docs] def deduplicate(self): ''' Remove those duplicated records which has exactly the same values in each feature_col for each multivariate timeseries distinguished by id_col. :return: the tsdataset instance. ''' self.df = deduplicate_timeseries_dataframe(df=self.df, dt_col=self.dt_col) return self
[docs] def resample(self, interval, start_time=None, end_time=None, merge_mode="mean"): ''' Resample on a new interval for each univariate time series distinguished by id_col and feature_col. :param interval: pandas offset aliases, indicating time interval of the output dataframe. :param start_time: start time of the output dataframe. :param end_time: end time of the output dataframe. :param merge_mode: if current interval is smaller than output interval, we need to merge the values in a mode. "max", "min", "mean" or "sum" are supported for now. :return: the tsdataset instance. ''' if not self.deploy_mode: from bigdl.nano.utils.common import invalidInputError invalidInputError(self._is_pd_datetime, "The time series data does not have a Pandas datetime format " "(you can use pandas.to_datetime to convert a string" " into a datetime format).") from pandas.api.types import is_numeric_dtype type_error_list = [val for val in self.target_col + self.feature_col if not is_numeric_dtype(self.df[val])] try: for val in type_error_list: self.df[val] = self.df[val].astype(np.float32) except Exception: invalidInputError(False, "All the columns of target_col " "and extra_feature_col should be of numeric type.") self.df = self.df.groupby([self.id_col]) \ .apply(lambda df: resample_timeseries_dataframe(df=df, dt_col=self.dt_col, interval=interval, start_time=start_time, end_time=end_time, id_col=self.id_col, merge_mode=merge_mode, deploy_mode=self.deploy_mode)) self._freq = pd.Timedelta(interval) self._freq_certainty = True self.df.reset_index(drop=True, inplace=True) return self
[docs] def repair_abnormal_data(self, mode="relative", threshold=3.0): ''' Repair the tsdataset by replacing abnormal data detected based on threshold with the last non N/A number. :param mode: detect abnormal data mode, select from "absolute" or "relative". "absolute": detect abnormal data by comparing with max and min value. "relative": detect abnormal data by comparing with mean value plus/minus several times standard deviation. :param threshold: indicates the range of comparison. It is a 2-dim tuple of float (min_value, max_value) when mode is set to "absolute" while it is a float number when mode is set to "relative". :return: the tsdataset instance. ''' self.df = _abnormal_value_repair(df=self.df, dt_col=self.dt_col, mode=mode, threshold=threshold) return self
[docs] def gen_dt_feature(self, features="auto", one_hot_features=None): ''' Generate datetime feature(s) for each record. :param features: str or list, states which feature(s) will be generated. If the value is set to be a str, it should be one of "auto" or "all". For "auto", a subset of datetime features will be generated under the consideration of the sampling frequency of your data. For "all", the whole set of datetime features will be generated. If the value is set to be a list, the list should contain the features you want to generate. A table of all datatime features and their description is listed below. The value defaults to "auto". :param one_hot_features: list, states which feature(s) will be generated as one-hot-encoded feature. The value defaults to None, which means no features will be generated with\ one-hot-encoded. | "MINUTE": The minute of the time stamp. | "DAY": The day of the time stamp. | "DAYOFYEAR": The ordinal day of the year of the time stamp. | "HOUR": The hour of the time stamp. | "WEEKDAY": The day of the week of the time stamp, Monday=0, Sunday=6. | "WEEKOFYEAR": The ordinal week of the year of the time stamp. | "MONTH": The month of the time stamp. | "YEAR": The year of the time stamp. | "IS_AWAKE": Bool value indicating whether it belongs to awake hours for the time stamp, | True for hours between 6A.M. and 1A.M. | "IS_BUSY_HOURS": Bool value indicating whether it belongs to busy hours for the time | stamp, True for hours between 7A.M. and 10A.M. and hours between 4P.M. and 8P.M. | "IS_WEEKEND": Bool value indicating whether it belongs to weekends for the time stamp, | True for Saturdays and Sundays. :return: the tsdataset instance. ''' if not self.deploy_mode: from bigdl.nano.utils.common import invalidInputError invalidInputError(self._is_pd_datetime, "The time series data does not have a Pandas datetime format" "(you can use pandas.to_datetime to convert a string into" " a datetime format.)") features_generated = [] self.df = generate_dt_features(input_df=self.df, dt_col=self.dt_col, features=features, one_hot_features=one_hot_features, freq=self._freq, features_generated=features_generated) self.feature_col += features_generated return self
[docs] def gen_global_feature(self, settings="comprehensive", full_settings=None, n_jobs=1): ''' Generate per-time-series feature for each time series. This method will be implemented by tsfresh. Make sure that the specified column name does not contain '__'. :param settings: str or dict. If a string is set, then it must be one of "comprehensive" "minimal" and "efficient". If a dict is set, then it should follow the instruction for default_fc_parameters in tsfresh. The value is defaulted to "comprehensive". :param full_settings: dict. It should follow the instruction for kind_to_fc_parameters in tsfresh. The value is defaulted to None. :param n_jobs: int. The number of processes to use for parallelization. :return: the tsdataset instance. ''' # TODO: relationship with scale should be figured out. from bigdl.nano.utils.common import invalidInputError try: from tsfresh import extract_features from tsfresh.feature_extraction import ComprehensiveFCParameters, \ MinimalFCParameters, EfficientFCParameters except ImportError: invalidInputError(False, "Please install tsfresh by `pip install tsfresh` to use " "`gen_global_feature` method.") DEFAULT_PARAMS = {"comprehensive": ComprehensiveFCParameters(), "minimal": MinimalFCParameters(), "efficient": EfficientFCParameters()} invalidInputError(not self._has_generate_agg_feature, "Only one of gen_global_feature and gen_rolling_feature" " should be called.") if full_settings is not None: self.df,\ addtional_feature =\ generate_global_features(input_df=self.df, column_id=self.id_col, column_sort=self.dt_col, kind_to_fc_parameters=full_settings, n_jobs=n_jobs) self.feature_col += addtional_feature return self if isinstance(settings, str): invalidInputError(settings in ['comprehensive', 'minimal', 'efficient'], "settings str should be one of 'comprehensive', 'minimal'," " 'efficient', but found {settings}.") default_fc_parameters = DEFAULT_PARAMS[settings] else: default_fc_parameters = settings self.df,\ addtional_feature =\ generate_global_features(input_df=self.df, column_id=self.id_col, column_sort=self.dt_col, default_fc_parameters=default_fc_parameters, n_jobs=n_jobs) self.feature_col += addtional_feature self._has_generate_agg_feature = True return self
[docs] def gen_rolling_feature(self, window_size, settings="comprehensive", full_settings=None, n_jobs=1): ''' Generate aggregation feature for each sample. This method will be implemented by tsfresh. Make sure that the specified column name does not contain '__'. :param window_size: int, generate feature according to the rolling result. :param settings: str or dict. If a string is set, then it must be one of "comprehensive" "minimal" and "efficient". If a dict is set, then it should follow the instruction for default_fc_parameters in tsfresh. The value is defaulted to "comprehensive". :param full_settings: dict. It should follow the instruction for kind_to_fc_parameters in tsfresh. The value is defaulted to None. :param n_jobs: int. The number of processes to use for parallelization. :return: the tsdataset instance. ''' # TODO: relationship with scale should be figured out. from bigdl.nano.utils.common import invalidInputError try: from tsfresh.utilities.dataframe_functions import roll_time_series from tsfresh.utilities.dataframe_functions import impute as impute_tsfresh from tsfresh import extract_features from tsfresh.feature_extraction import ComprehensiveFCParameters, \ MinimalFCParameters, EfficientFCParameters except ImportError: invalidInputError(False, "Please install tsfresh by `pip install tsfresh` to use " "`gen_rolling_feature` method.") DEFAULT_PARAMS = {"comprehensive": ComprehensiveFCParameters(), "minimal": MinimalFCParameters(), "efficient": EfficientFCParameters()} invalidInputError(not self._has_generate_agg_feature, "Only one of gen_global_feature and gen_rolling_feature" " should be called.") if isinstance(settings, str): invalidInputError(settings in ['comprehensive', 'minimal', 'efficient'], "settings str should be one of 'comprehensive', 'minimal'," " 'efficient', but found {settings}.") default_fc_parameters = DEFAULT_PARAMS[settings] else: default_fc_parameters = settings invalidInputError(window_size < self.df.groupby(self.id_col).size().min() + 1, "gen_rolling_feature should have a window_size smaller" " than shortest time series length.") df_rolled = roll_time_series(self.df, column_id=self.id_col, column_sort=self.dt_col, max_timeshift=window_size - 1, min_timeshift=window_size - 1, n_jobs=n_jobs) if not full_settings: self.roll_feature_df = extract_features(df_rolled, column_id=self.id_col, column_sort=self.dt_col, default_fc_parameters=default_fc_parameters, n_jobs=n_jobs) else: self.roll_feature_df = extract_features(df_rolled, column_id=self.id_col, column_sort=self.dt_col, kind_to_fc_parameters=full_settings, n_jobs=n_jobs) impute_tsfresh(self.roll_feature_df) self.feature_col += list(self.roll_feature_df.columns) self.roll_additional_feature = list(self.roll_feature_df.columns) self._has_generate_agg_feature = True return self
[docs] def roll(self, horizon, lookback='auto', feature_col=None, target_col=None, id_sensitive=False, time_enc=False, label_len=0, is_predict=False): ''' Sampling by rolling for machine learning/deep learning models. :param lookback: int, lookback value. Default to 'auto', if 'auto', the mode of time series' cycle length will be taken as the lookback. :param horizon: int or list. If `horizon` is an int, we will sample `horizon` step continuously after the forecasting point. If `horizon` is a list, we will sample discretely according to the input list. 1 means the timestamp just after the observed data. specially, when `horizon` is set to 0, ground truth will be generated as None. :param feature_col: str or list, indicates the feature col name. Default to None, where we will take all available feature in rolling. :param target_col: str or list, indicates the target col name. Default to None, where we will take all target in rolling. It should be a subset of target_col you used to initialize the tsdataset. :param id_sensitive: bool. If `id_sensitive` is False, we will rolling on each id's sub dataframe and fuse the sampings. The shape of rolling will be x: (num_sample, lookback, num_feature_col + num_target_col) y: (num_sample, horizon + label_len, num_target_col) where num_sample is the summation of sample number of each dataframe. If `id_sensitive` is True, we will rolling on the wide dataframe whose columns are cartesian product of id_col and feature_col. The shape of rolling will be x: (num_sample, lookback, new_num_feature_col + new_num_target_col) y: (num_sample, horizon + label_len, new_num_target_col) where num_sample is the sample number of the wide dataframe, new_num_feature_col is the product of the number of id and the number of feature_col, new_num_target_col is the product of the number of id and the number of target_col. :param time_enc: bool. This parameter should be set to True only when you are using Autoformer model. With time_enc to be true, 2 additional numpy ndarray will be returned when you call `.to_numpy()`. Be sure to have a time type for dt_col if you set time_enc to True. :param label_len: int. This parameter should be set to True only when you are using Autoformer model. This indicates the length of overlap area of output(y) and input(x) on time axis. :param is_predict: bool. This parameter indicates if the dataset will be sampled as a prediction dataset (without groud truth). :return: the tsdataset instance. roll() can be called by: >>> # Here is a df example: >>> # id datetime value "extra feature 1" "extra feature 2" >>> # 00 2019-01-01 1.9 1 2 >>> # 01 2019-01-01 2.3 0 9 >>> # 00 2019-01-02 2.4 3 4 >>> # 01 2019-01-02 2.6 0 2 >>> tsdataset = TSDataset.from_pandas(df, dt_col="datetime", >>> target_col="value", id_col="id", >>> extra_feature_col=["extra feature 1", >>> "extra feature 2"]) >>> horizon, lookback = 1, 1 >>> tsdataset.roll(lookback=lookback, horizon=horizon, id_sensitive=False) >>> x, y = tsdataset.to_numpy() >>> print(x, y) # x = [[[1.9, 1, 2 ]], [[2.3, 0, 9 ]]] y = [[[ 2.4 ]], [[ 2.6 ]]] >>> print(x.shape, y.shape) # x.shape = (2, 1, 3) y.shape = (2, 1, 1) >>> tsdataset.roll(lookback=lookback, horizon=horizon, id_sensitive=True) >>> x, y = tsdataset.to_numpy() >>> print(x, y) # x = [[[ 1.9, 2.3, 1, 2, 0, 9 ]]] y = [[[ 2.4, 2.6]]] >>> print(x.shape, y.shape) # x.shape = (1, 1, 6) y.shape = (1, 1, 2) ''' if not self.deploy_mode: from bigdl.nano.utils.common import invalidInputError if id_sensitive and not _check_is_aligned(self.df, self.id_col, self.dt_col): invalidInputError(False, "The time series data should be " "aligned if id_sensitive is set to True.") else: is_predict = True feature_col = _to_list(feature_col, "feature_col", deploy_mode=self.deploy_mode) \ if feature_col is not None else self.feature_col target_col = _to_list(target_col, "target_col", deploy_mode=self.deploy_mode) \ if target_col is not None else self.target_col if self.roll_additional_feature: additional_feature_col =\ list(set(feature_col).intersection(set(self.roll_additional_feature))) feature_col =\ list(set(feature_col) - set(self.roll_additional_feature)) self.roll_feature = feature_col + additional_feature_col else: additional_feature_col = None self.roll_feature = feature_col self.roll_target = target_col num_id = len(self._id_list) num_feature_col = len(self.roll_feature) num_target_col = len(self.roll_target) self.id_sensitive = id_sensitive roll_feature_df = None if self.roll_feature_df is None \ else self.roll_feature_df[additional_feature_col] if time_enc and label_len == 0: label_len = max(lookback // 2, 1) self.lookback, self.horizon, self.label_len = lookback, horizon, label_len # horizon_time is only for time_enc, the time_enc numpy ndarray won't have any # shape change when the dataset is for prediction. horizon_time = self.horizon if is_predict: self.horizon = 0 if self.lookback == 'auto': self.lookback = self.get_cycle_length('mode', top_k=3) groups = self.df.groupby([self.id_col]) rolling_result = [] for _, group in groups: rolling_result.append(roll_timeseries_dataframe(df=group, roll_feature_df=roll_feature_df, lookback=self.lookback, horizon=self.horizon, feature_col=feature_col, target_col=target_col, label_len=label_len, deploy_mode=self.deploy_mode)) # concat the result on required axis concat_axis = 2 if id_sensitive else 0 self.numpy_x = np.concatenate([rolling_result[i][0] for i in range(len(self._id_list))], axis=concat_axis).astype(np.float32) if (horizon != 0 and is_predict is False) or time_enc: self.numpy_y = np.concatenate([rolling_result[i][1] for i in range(len(self._id_list))], axis=concat_axis).astype(np.float32) else: self.numpy_y = None # time_enc if time_enc: time_enc_arr = [] for _, group in groups: time_enc_arr.append(gen_time_enc_arr(df=group, dt_col=self.dt_col, freq=self._freq, horizon_time=horizon_time, is_predict=is_predict, lookback=lookback, label_len=label_len)) self.numpy_x_timeenc = np.concatenate([time_enc_arr[i][0] for i in range(len(self._id_list))], axis=0).astype(np.float32) self.numpy_y_timeenc = np.concatenate([time_enc_arr[i][1] for i in range(len(self._id_list))], axis=0).astype(np.float32) else: self.numpy_x_timeenc = None self.numpy_y_timeenc = None # target first # TODO: check id_sensitive effectiveness for the time_enc=True cases. if self.id_sensitive: feature_start_idx = num_target_col * num_id reindex_list = [list(range(i * num_target_col, (i + 1) * num_target_col)) + list(range(feature_start_idx + i * num_feature_col, feature_start_idx + (i + 1) * num_feature_col)) for i in range(num_id)] reindex_list = functools.reduce(lambda a, b: a + b, reindex_list) sorted_index = sorted(range(len(reindex_list)), key=reindex_list.__getitem__) self.numpy_x = self.numpy_x[:, :, sorted_index] # scaler index num_roll_target = len(self.roll_target) repeat_factor = len(self._id_list) if self.id_sensitive else 1 scaler_index = [self.target_col.index(self.roll_target[i]) for i in range(num_roll_target)] * repeat_factor self.scaler_index = scaler_index return self
[docs] def to_torch_data_loader(self, batch_size=32, roll=True, lookback='auto', horizon=None, feature_col=None, target_col=None, shuffle=True, time_enc=False, label_len=0, is_predict=False): """ Convert TSDataset to a PyTorch DataLoader with or without rolling. We recommend to use to_torch_data_loader(default roll=True) if you don't need to output the rolled numpy array. It is much more efficient than rolling separately, especially when the dataframe or lookback is large. :param batch_size: int, the batch_size for a Pytorch DataLoader. It defaults to 32. :param roll: Boolean. Whether to roll the dataframe before converting to DataLoader. If True, you must also specify lookback and horizon for rolling. If False, you must have called tsdataset.roll() before calling to_torch_data_loader(). Default to True. :param lookback: int, lookback value. Default to 'auto', the mode of time series' cycle length will be taken as the lookback. :param horizon: int or list, if `horizon` is an int, we will sample `horizon` step continuously after the forecasting point. if `horizon` is a list, we will sample discretely according to the input list. specially, when `horizon` is set to 0, ground truth will be generated as None. :param feature_col: str or list, indicates the feature col name. Default to None, where we will take all available feature in rolling. :param target_col: str or list, indicates the target col name. Default to None, where we will take all target in rolling. it should be a subset of target_col you used to initialize the tsdataset. :param shuffle: if the dataloader is shuffled. default to True. :param time_enc: bool, This parameter should be set to True only when you are using Autoformer model. With time_enc to be true, 2 additional numpy ndarray will be returned when you call `.to_numpy()`. Be sure to have a time type for dt_col if you set time_enc to True. :param label_len: int, This parameter should be set to True only when you are using Autoformer model. This indicates the length of overlap area of output(y) and input(x) on time axis. :param is_predict: bool, This parameter should be set to True only when you are processing test data without accuracy evaluation. This indicates if the dataset will be sampled as a prediction dataset(without groud truth). :return: A pytorch DataLoader instance. The data returned from dataloader is in the following form: 1. a 3d numpy ndarray when is_predict=True or horizon=0 and time_enc=False 2. a 2-dim tuple of 3d numpy ndarray (x, y) when is_predict=False and horizon != 0 and time_enc=False 3. a 4-dim tuple of 3d numpy ndarray (x, y, x_enc, y_enc) when time_enc=True to_torch_data_loader() can be called by: >>> # Here is a df example: >>> # id datetime value "extra feature 1" "extra feature 2" >>> # 00 2019-01-01 1.9 1 2 >>> # 01 2019-01-01 2.3 0 9 >>> # 00 2019-01-02 2.4 3 4 >>> # 01 2019-01-02 2.6 0 2 >>> tsdataset = TSDataset.from_pandas(df, dt_col="datetime", >>> target_col="value", id_col="id", >>> extra_feature_col=["extra feature 1", >>> "extra feature 2"]) >>> horizon, lookback = 1, 1 >>> data_loader = tsdataset.to_torch_data_loader(batch_size=32, >>> lookback=lookback, >>> horizon=horizon) >>> # or roll outside. That might be less efficient than the way above. >>> tsdataset.roll(lookback=lookback, horizon=horizon, id_sensitive=False) >>> x, y = tsdataset.to_numpy() >>> print(x, y) # x = [[[1.9, 1, 2 ]], [[2.3, 0, 9 ]]] y = [[[ 2.4 ]], [[ 2.6 ]]] >>> data_loader = tsdataset.to_torch_data_loader(batch_size=32, roll=False) """ from torch.utils.data import TensorDataset, DataLoader import torch from bigdl.nano.utils.common import invalidInputError if roll: if horizon is None: invalidInputError(False, "You must input horizon if roll is True (default roll=True)!") from bigdl.chronos.data.utils.roll_dataset import RollDataset feature_col = _to_list(feature_col, "feature_col") if feature_col is not None \ else self.feature_col target_col = _to_list(target_col, "target_col") if target_col is not None \ else self.target_col if time_enc and label_len == 0: label_len = max(lookback // 2, 1) # set scaler index for unscale_numpy self.scaler_index = [self.target_col.index(t) for t in target_col] self.lookback, self.horizon, self.label_len = lookback, horizon, label_len if self.lookback == 'auto': self.lookback = self.get_cycle_length('mode', top_k=3) invalidInputError(not self._has_generate_agg_feature, "Currently to_torch_data_loader does not support " "'gen_global_feature' and 'gen_rolling_feature' methods.") if isinstance(self.horizon, int): need_dflen = self.lookback + self.horizon else: need_dflen = self.lookback + max(self.horizon) if len(self.df) < need_dflen: invalidInputError(False, "The length of the dataset must be larger than the sum " "of lookback and horizon, while get lookback+horizon=" f"{need_dflen} and the length of dataset is {len(self.df)}.") torch_dataset = RollDataset(self.df, dt_col=self.dt_col, freq=self._freq, lookback=self.lookback, horizon=self.horizon, feature_col=feature_col, target_col=target_col, id_col=self.id_col, time_enc=time_enc, label_len=label_len, is_predict=is_predict) # TODO gen_rolling_feature and gen_global_feature will be support later self.roll_target = target_col self.roll_feature = feature_col batch_size = 32 if batch_size is None else batch_size # _pytorch_fashion_inference return DataLoader(torch_dataset, batch_size=batch_size, shuffle=shuffle) else: if self.numpy_x is None: invalidInputError(False, "Please call 'roll' method before transforming a TSDataset to " "torch DataLoader if roll is False!") if self.numpy_y is None: x = self.numpy_x return DataLoader(TensorDataset(torch.from_numpy(x).float()), batch_size=batch_size, shuffle=shuffle) elif self.numpy_x_timeenc is None: x, y = self.to_numpy() return DataLoader(TensorDataset(torch.from_numpy(x).float(), torch.from_numpy(y).float()), batch_size=batch_size, shuffle=shuffle) else: x, y, x_enc, y_enc = self.to_numpy() return DataLoader(TensorDataset(torch.from_numpy(x).float(), torch.from_numpy(y).float(), torch.from_numpy(x_enc).float(), torch.from_numpy(y_enc).float()), batch_size=batch_size, shuffle=shuffle)
[docs] def to_tf_dataset(self, batch_size=32, shuffle=False): """ Export a Dataset whose elements are slices of the given tensors. :param batch_size: Number of samples per batch of computation. If unspecified, batch_size will default to 32. :return: a tf.data dataset, including x and y. """ # TODO Requires a tf dataset creator method and can be use less memory. import tensorflow as tf from bigdl.nano.utils.common import invalidInputError if self.numpy_x is None: invalidInputError(False, "Please call 'roll' method " "before transform a TSDataset to tf dataset!") data = tf.data.Dataset.from_tensor_slices((self.numpy_x, self.numpy_y)) batch_size = 32 if batch_size is None else batch_size if shuffle: data = data.cache().shuffle(self.numpy_x.shape[0]).batch(batch_size) else: data = data.batch(batch_size).cache() return data.prefetch(tf.data.AUTOTUNE)
[docs] def to_numpy(self): ''' Export rolling result in form of : 1. a 3d numpy ndarray when is_predict=True or horizon=0 and time_enc=False 2. a 2-dim tuple of 3d numpy ndarray (x, y) when is_predict=False and horizon != 0 and time_enc=False 3. a 4-dim tuple of 3d numpy ndarray (x, y, x_enc, y_enc) when time_enc=True :return: a 3d numpy ndarray when is_predict=True or horizon=0 and time_enc=False. or a 2-dim tuple of 3d numpy ndarray (x, y) when is_predict=False and horizon != 0 and time_enc=False or a 4-dim tuple of 3d numpy ndarray (x, y, x_enc, y_enc) when time_enc=True. The ndarray is casted to float32. ''' if not self.deploy_mode: from bigdl.nano.utils.common import invalidInputError if self.numpy_x is None: invalidInputError(False, "Please call 'roll' method " "before transform a TSDataset to numpy ndarray!") if self.numpy_y is None and self.numpy_x_timeenc is None: return self.numpy_x elif self.numpy_x_timeenc is None: return self.numpy_x, self.numpy_y else: return self.numpy_x, self.numpy_y, self.numpy_x_timeenc, self.numpy_y_timeenc
[docs] def to_pandas(self): ''' Export the pandas dataframe. :return: the internal dataframe. ''' return self.df.copy()
[docs] def scale(self, scaler, fit=True): ''' Scale the time series dataset's feature column and target column. :param scaler: sklearn scaler instance, StandardScaler, MaxAbsScaler, MinMaxScaler and RobustScaler are supported. :param fit: if we need to fit the scaler. Typically, the value should be set to True for training set, while False for validation and test set. The value is defaulted to True. :return: the tsdataset instance. Assume there is a training set tsdata and a test set tsdata_test. scale() should be called first on training set with default value fit=True, then be called on test set with the same scaler and fit=False. >>> from sklearn.preprocessing import StandardScaler >>> scaler = StandardScaler() >>> tsdata.scale(scaler, fit=True) >>> tsdata_test.scale(scaler, fit=False) ''' feature_col = self.feature_col if self.roll_additional_feature: feature_col = [] for feature in self.feature_col: if feature not in self.roll_additional_feature: feature_col.append(feature) if fit and not self.deploy_mode: self.df[self.target_col + feature_col] = \ scaler.fit_transform(self.df[self.target_col + feature_col]) else: if not self.deploy_mode: from sklearn.utils.validation import check_is_fitted from bigdl.nano.utils.common import invalidInputError try: invalidInputError(not check_is_fitted(scaler), "scaler is not fittedd") except Exception: invalidInputError(False, "When calling scale for the first time, " "you need to set fit=True.") self.df[self.target_col + feature_col] = \ scale_timeseries_numpy(self.df[self.target_col + feature_col].values, scaler) self.scaler = scaler return self
[docs] def unscale(self): ''' Unscale the time series dataset's feature column and target column. :return: the tsdataset instance. ''' feature_col = self.feature_col if self.roll_additional_feature: feature_col = [] for feature in self.feature_col: if feature not in self.roll_additional_feature: feature_col.append(feature) self.df[self.target_col + feature_col] = \ self.scaler.inverse_transform(self.df[self.target_col + feature_col]) return self
[docs] def unscale_numpy(self, data): ''' Unscale the time series forecaster's numpy prediction result/ground truth. :param data: a numpy ndarray with 3 dim whose shape should be exactly the same with self.numpy_y. :return: the unscaled numpy ndarray. ''' return unscale_timeseries_numpy(data, self.scaler, self.scaler_index)
def _check_basic_invariants(self, strict_check=False): ''' This function contains a bunch of assertions to make sure strict rules(the invariants) for the internal dataframe(self.df) must stands. If not, clear and user-friendly error or warning message should be provided to the users. This function will be called after each method(e.g. impute, deduplicate ...). ''' # check type _check_type(self.df, "df", pd.DataFrame) _check_type(self.id_col, "id_col", str) _check_type(self.dt_col, "dt_col", str) _check_type(self.target_col, "target_col", list) _check_type(self.feature_col, "feature_col", list) # check valid name _check_col_within(self.df, self.id_col) _check_col_within(self.df, self.dt_col) for target_col_name in self.target_col: _check_col_within(self.df, target_col_name) for feature_col_name in self.feature_col: if self.roll_additional_feature and feature_col_name in self.roll_additional_feature: continue _check_col_within(self.df, feature_col_name) # check no n/a in critical col _check_col_no_na(self.df, self.dt_col) _check_col_no_na(self.df, self.id_col) # check dt sorted if strict_check: _check_dt_is_sorted(self.df, self.dt_col)
[docs] def get_cycle_length(self, aggregate='mode', top_k=3): """ Calculate the cycle length of the time series in this TSDataset. Args: top_k (int): The freq with top top_k power after fft will be used to check the autocorrelation. Higher top_k might be time-consuming. The value is default to 3. aggregate (str): Select the mode of calculation time period, We only support 'min', 'max', 'mode', 'median', 'mean'. Returns: Describe the value of the time period distribution. """ from bigdl.nano.utils.common import invalidInputError invalidInputError(isinstance(top_k, int), f"top_k type must be int, but found {type(top_k)}.") invalidInputError(isinstance(aggregate, str), f"aggregate type must be str, but found {type(aggregate)}.") invalidInputError(aggregate.lower().strip() in ['min', 'max', 'mode', 'median', 'mean'], f"We Only support 'min' 'max' 'mode' 'median' 'mean'," f" but found {aggregate}.") if len(self.target_col) == 1: res = [] groups = self.df.groupby(self.id_col) for _, group in groups: res.append(cycle_length_est(group[self.target_col[0]].values, top_k)) res = pd.Series(res) else: res = [] groups = self.df.groupby(self.id_col) for _, group in groups: res.append(pd.DataFrame({'cycle_length': [cycle_length_est(group[col].values, top_k)for col in self.target_col]})) res = pd.concat(res, axis=0) res = res.cycle_length if aggregate.lower().strip() == 'mode': self.best_cycle_length = int(res.value_counts().index[0]) elif aggregate.lower().strip() == 'mean': self.best_cycle_length = int(res.mean()) elif aggregate.lower().strip() == 'median': self.best_cycle_length = int(res.median()) elif aggregate.lower().strip() == 'min': self.best_cycle_length = int(res.min()) elif aggregate.lower().strip() == 'max': self.best_cycle_length = int(res.max()) return self.best_cycle_length
[docs] def export_jit(self, path_dir=None, drop_dt_col=True): """ Exporting data processing pipeline to torchscript so that it can be used without Python environment. For example, when you are deploying a trained model in C++ and need to process input data, you can call this method to get a torchscript module containing the data processing pipeline and save it in a .pt file when you finish developing the model, when deploying, you can load the torchscript module from .pt file and run the data processing pipeline in C++ using libtorch APIs, and the output tensor can be fed into the trained model for inference. Currently we support exporting preprocessing (scale and roll) and postprocessing (unscale) to torchscript, they can do the same thing as the following code: >>> # preprocess >>> tsdata.scale(scaler, fit=False) \\ >>> .roll(lookback, horizon, is_predict=True) >>> preprocess_output = tsdata.to_numpy() >>> # postprocess >>> # "data" can be the output of model inference >>> postprocess_output = tsdata.unscale_numpy(data) Preprocessing and postprocessing will be converted to separate torchscript modules, so two modules will be returned and saved. When deploying, the compiled torchscript module can be used by: >>> // deployment in C++ >>> #include <torch/torch.h> >>> #include <torch/script.h> >>> // create input tensor from your data >>> // the data to create input tensor should have the same format as the >>> // data used in developing >>> torch::Tensor input_tensor = create_input_tensor(data); >>> // load the module >>> torch::jit::script::Module preprocessing; >>> preprocessing = torch::jit::load(preprocessing_path); >>> // run data preprocessing >>> torch::Tensor preprocessing_output = preprocessing.forward(input_tensor).toTensor(); >>> // inference using your trained model >>> torch::Tensor inference_output = trained_model(preprocessing_output) >>> // load the postprocessing module >>> torch::jit::script::Module postprocessing; >>> postprocessing = torch::jit::load(postprocessing_path); >>> // run postprocessing >>> torch::Tensor output = postprocessing.forward(inference_output).toTensor() Currently there are some limitations: 1. Please make sure the value of each column can be converted to Pytorch tensor, for example, id "00" is not allowed because str can not be converted to a tensor, you should use integer (0, 1, ..) as id instead of string. 2. Some features in tsdataset.scale and tsdataset.roll are unavailable in this pipeline: a. If self.roll_additional_feature is not None, it can't be processed in scale and roll b. id_sensitive, time_enc and label_len parameter is not supported in roll 3. Users are expected to call .scale(scaler, fit=True) before calling export_jit. Single roll operation is not supported for converting now. :param path_dir: The path to save the compiled torchscript modules, default to None. If set to None, you should call torch.jit.save() in your code to save the returned modules; if not None, the path should be a directory, and the modules will be saved at "path_dir/tsdata_preprocessing.pt" and "path_dir/tsdata_postprocessing.pt". :param drop_dtcol: Whether to delete the datetime column, defaults to True. Since datetime value (like "2022-12-12") can't be converted to Pytorch tensor, you can choose different ways to workaround this. If set to True, the datetime column will be deleted, then you also need to skip the datetime column when reading data from data source (like csv files) in deployment environment to keep the same structure as the data used in development; if set to False, the datetime column will not be deleted, and you need to make sure the datetime colunm can be successfully converted to Pytorch tensor when reading data in deployment environment. For example, you can set each data in datetime column to an int (or other vaild types) value, since datetime column is not necessary in preprocessing and postprocessing, the value can be arbitrary. :return: A tuple (preprocessing_module, postprocessing_module) containing the compiled torchscript modules. """ from bigdl.chronos.data.utils.export_torchscript \ import export_processing_to_jit, get_index import torch import os if drop_dt_col: self.df.drop(columns=self.dt_col, inplace=True) # target_feature_index: index of target col and feature col, will be used in scale and roll id_index, target_feature_index = get_index(self.df, self.id_col, self.target_col, self.feature_col) preprocessing_module = export_processing_to_jit(self.scaler, self.lookback, id_index, target_feature_index, self.scaler_index, "preprocessing") postprocessing_module = export_processing_to_jit(self.scaler, self.lookback, id_index, target_feature_index, self.scaler_index, "postprocessing") if path_dir: preprocess_path = os.path.join(path_dir, "tsdata_preprocessing.pt") postprocess_path = os.path.join(path_dir, "tsdata_postprocessing.pt") torch.jit.save(preprocessing_module, preprocess_path) torch.jit.save(postprocessing_module, postprocess_path) return preprocessing_module, postprocessing_module